This is an automated email from the ASF dual-hosted git repository. mahesh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 7411d42 HIVE-22736 : Support replication across multiple encryption zones. (Aasha Medhi, reviewed by Mahesh Kumar Behera) 7411d42 is described below commit 7411d42579ffa0bad96e8da731a1a35afc9ff614 Author: Aasha Medhi <aasha.medhi2...@gmail.com> AuthorDate: Fri Feb 7 07:16:24 2020 +0530 HIVE-22736 : Support replication across multiple encryption zones. (Aasha Medhi, reviewed by Mahesh Kumar Behera) Signed-off-by: Mahesh Kumar Behera <mah...@apache.org> --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 + .../TestMetaStoreMultipleEncryptionZones.java | 1434 ++++++++++++++++++++ .../hive/metastore/TestReplChangeManager.java | 1 + .../cache/TestCachedStoreUpdateUsingEvents.java | 2 + .../hive/ql/metadata/TestAlterTableMetadata.java | 1 + .../java/org/apache/hive/jdbc/TestJdbcDriver2.java | 4 + .../org/apache/hive/jdbc/TestJdbcWithMiniHS2.java | 1 + .../ql/ddl/table/create/CreateTableOperation.java | 1 - .../table/storage/AlterTableArchiveOperation.java | 4 +- .../ddl/table/storage/AlterTableArchiveUtils.java | 5 +- .../storage/AlterTableSetLocationOperation.java | 1 + .../storage/AlterTableUnarchiveOperation.java | 6 +- .../org/apache/hadoop/hive/ql/metadata/Hive.java | 4 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 4 +- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 4 +- .../apache/hadoop/hive/shims/Hadoop23Shims.java | 2 +- .../org/apache/hadoop/hive/shims/HadoopShims.java | 15 +- standalone-metastore/metastore-common/pom.xml | 5 + .../hadoop/hive/metastore/ReplChangeManager.java | 187 ++- .../hadoop/hive/metastore/conf/MetastoreConf.java | 5 + .../apache/hadoop/hive/metastore/utils/Retry.java | 52 + .../hadoop/hive/metastore/utils/package-info.java | 22 + .../hadoop/hive/metastore/utils/RetryTest.java | 57 + .../hadoop/hive/metastore/utils/package-info.java | 22 + .../hadoop/hive/metastore/HiveAlterHandler.java | 7 +- .../hadoop/hive/metastore/HiveMetaStore.java | 120 +- 26 files changed, 1886 insertions(+), 85 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 12a022c..a120b45 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -452,6 +452,11 @@ public class HiveConf extends Configuration { REPLCMRETIAN("hive.repl.cm.retain","24h", new TimeValidator(TimeUnit.HOURS), "Time to retain removed files in cmrootdir."), + REPLCMENCRYPTEDDIR("hive.repl.cm.encryptionzone.rootdir", ".cmroot", + "Root dir for ChangeManager if encryption zones are enabled, used for deleted files."), + REPLCMFALLBACKNONENCRYPTEDDIR("hive.repl.cm.nonencryptionzone.rootdir", + "/user/${system:user.name}/cmroot/", + "Root dir for ChangeManager for non encrypted paths if hive.repl.cmrootdir is encrypted."), REPLCMINTERVAL("hive.repl.cm.interval","3600s", new TimeValidator(TimeUnit.SECONDS), "Inteval for cmroot cleanup thread."), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java new file mode 100644 index 0000000..51bb787 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestMetaStoreMultipleEncryptionZones.java @@ -0,0 +1,1434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.ReplChangeManager.RecycleType; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder; +import org.apache.hadoop.hive.metastore.client.builder.TableBuilder; +import org.apache.hadoop.hive.metastore.api.Type; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.thrift.TException; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +/** + * TestMetaStoreAuthorization. + */ +public class TestMetaStoreMultipleEncryptionZones { + private static HiveMetaStoreClient client; + private static HiveConf hiveConf; + private static Configuration conf; + private static Warehouse warehouse; + private static FileSystem warehouseFs; + private static MiniDFSCluster miniDFSCluster; + private static String cmroot; + private static FileSystem fs; + private static HadoopShims.HdfsEncryptionShim shimCm; + private static String cmrootEncrypted; + private static String jksFile = System.getProperty("java.io.tmpdir") + "/test.jks"; + private static String cmrootFallBack; + + @BeforeClass + public static void setUp() throws Exception { + //Create secure cluster + conf = new Configuration(); + conf.set("hadoop.security.key.provider.path", "jceks://file" + jksFile); + miniDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + DFSTestUtil.createKey("test_key_cm", miniDFSCluster, conf); + DFSTestUtil.createKey("test_key_db", miniDFSCluster, conf); + hiveConf = new HiveConf(TestReplChangeManager.class); + hiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true); + hiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60); + hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal); + + cmroot = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmroot"; + cmrootFallBack = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmrootFallback"; + cmrootEncrypted = "cmrootEncrypted"; + hiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmroot); + hiveConf.set(HiveConf.ConfVars.REPLCMENCRYPTEDDIR.varname, cmrootEncrypted); + hiveConf.set(HiveConf.ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.varname, cmrootFallBack); + initReplChangeManager(); + //Create cm in encrypted zone + shimCm = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf); + + try { + client = new HiveMetaStoreClient(hiveConf); + } catch (Throwable e) { + System.err.println("Unable to open the metastore"); + System.err.println(StringUtils.stringifyException(e)); + throw e; + } + } + + private static void initReplChangeManager() throws Exception{ + warehouse = new Warehouse(hiveConf); + warehouseFs = warehouse.getWhRoot().getFileSystem(hiveConf); + fs = new Path(cmroot).getFileSystem(hiveConf); + fs.mkdirs(warehouse.getWhRoot()); + } + + @AfterClass + public static void tearDown() throws Exception { + try { + miniDFSCluster.shutdown(); + client.close(); + } catch (Throwable e) { + System.err.println("Unable to close metastore"); + System.err.println(StringUtils.stringifyException(e)); + throw e; + } + } + + @Test + public void dropTableWithDifferentEncryptionZonesDifferentKey() throws Throwable { + String dbName1 = "encrdbdiffkey1"; + String dbName2 = "encrdbdiffkey2"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + + silentDropDatabase(dbName1); + silentDropDatabase(dbName2); + new DatabaseBuilder() + .setName(dbName1) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + new DatabaseBuilder() + .setName(dbName2) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + Path dirDb1 = new Path(warehouse.getWhRoot(), dbName1 +".db"); + warehouseFs.delete(dirDb1, true); + warehouseFs.mkdirs(dirDb1); + shimCm.createEncryptionZone(dirDb1, "test_key_db"); + Path dirTbl1 = new Path(dirDb1, tblName1); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirDb2 = new Path(warehouse.getWhRoot(), dbName2 +".db"); + warehouseFs.delete(dirDb2, true); + warehouseFs.mkdirs(dirDb2); + shimCm.createEncryptionZone(dirDb2, "test_key_cm"); + Path dirTbl2 = new Path(dirDb2, tblName2); + warehouseFs.mkdirs(dirTbl2); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + new TableBuilder() + .setDbName(dbName1) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName1, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName2) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName1, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName1, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName2, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName2, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + + + } + + @Test + public void dropTableWithTableAtEncryptionZoneRoot() throws Throwable { + String dbName = "encrdbroot"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_cm"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + assertTrue(warehouseFs.exists(new Path(dirTbl1, cmrootEncrypted))); + assertTrue(warehouseFs.exists(new Path(dirTbl2, cmrootEncrypted))); + } + + @Test + public void dropTableWithDifferentEncryptionZonesSameKey() throws Throwable { + String dbName1 = "encrdbsamekey1"; + String dbName2 = "encrdbsamekey2"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + + silentDropDatabase(dbName1); + silentDropDatabase(dbName2); + new DatabaseBuilder() + .setName(dbName1) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + new DatabaseBuilder() + .setName(dbName2) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + Path dirDb1 = new Path(warehouse.getWhRoot(), dbName1 +".db"); + warehouseFs.mkdirs(dirDb1); + shimCm.createEncryptionZone(dirDb1, "test_key_db"); + Path dirTbl1 = new Path(dirDb1, tblName1); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirDb2 = new Path(warehouse.getWhRoot(), dbName2 +".db"); + warehouseFs.mkdirs(dirDb2); + shimCm.createEncryptionZone(dirDb2, "test_key_db"); + Path dirTbl2 = new Path(dirDb2, tblName2); + warehouseFs.mkdirs(dirTbl2); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + new TableBuilder() + .setDbName(dbName1) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName1, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName2) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName1, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName1, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName2, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName2, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + + } + + @Test + public void dropTableWithSameEncryptionZones() throws Throwable { + String dbName = "encrdb3"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + silentDropDatabase(dbName); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.delete(dirDb, true); + warehouseFs.mkdirs(dirDb); + shimCm.createEncryptionZone(dirDb, "test_key_db"); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropTableWithoutEncryptionZonesForCm() throws Throwable { + String dbName = "simpdb1"; + String tblName = "simptbl"; + String typeName = "Person"; + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName); + Assert.assertNotNull(tbl); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName); + } catch (Exception e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropExternalTableWithSameEncryptionZonesForCm() throws Throwable { + String dbName = "encrdb4"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.delete(dirDb, true); + warehouseFs.mkdirs(dirDb); + shimCm.createEncryptionZone(dirDb, "test_key_db"); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropExternalTableWithDifferentEncryptionZones() throws Throwable { + String dbName = "encrdb5"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_db"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropExternalTableWithDifferentEncryptionZonesDifferentKey() throws Throwable { + String dbName = "encrdb6"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + shimCm.createEncryptionZone(dirTbl2, "test_key_cm"); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName1); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + client.dropTable(dbName, tblName2); + } catch (MetaException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void dropExternalTableWithoutEncryptionZonesForCm() throws Throwable { + String dbName = "simpdb2"; + String tblName = "simptbl"; + String typeName = "Person"; + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addTableParam("EXTERNAL", "true") + .addTableParam("external.table.purge", "true") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName); + Assert.assertNotNull(tbl); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + boolean exceptionThrown = false; + try { + client.dropTable(dbName, tblName); + } catch (Exception e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + @Test + public void truncateTableWithDifferentEncryptionZones() throws Throwable { + String dbName1 = "encrdbtrunc1"; + String dbName2 = "encrdbtrunc2"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + + silentDropDatabase(dbName1); + silentDropDatabase(dbName2); + new DatabaseBuilder() + .setName(dbName1) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + new DatabaseBuilder() + .setName(dbName2) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + Path dirDb1 = new Path(warehouse.getWhRoot(), dbName1 +".db"); + warehouseFs.delete(dirDb1, true); + warehouseFs.mkdirs(dirDb1); + shimCm.createEncryptionZone(dirDb1, "test_key_db"); + Path dirTbl1 = new Path(dirDb1, tblName1); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirDb2 = new Path(warehouse.getWhRoot(), dbName2 +".db"); + warehouseFs.delete(dirDb2, true); + warehouseFs.mkdirs(dirDb2); + shimCm.createEncryptionZone(dirDb2, "test_key_db"); + Path dirTbl2 = new Path(dirDb2, tblName2); + warehouseFs.mkdirs(dirTbl2); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + new TableBuilder() + .setDbName(dbName1) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName1, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName2) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + boolean exceptionThrown = false; + try { + client.truncateTable(dbName1, tblName1, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + assertNotNull(client.getTable(dbName1, tblName1)); + exceptionThrown = false; + try { + client.truncateTable(dbName2, tblName2, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + assertNotNull(client.getTable(dbName2, tblName2)); + } + + @Test + public void truncateTableWithDifferentEncryptionZonesDifferentKey() throws Throwable { + String dbName1 = "encrdb1"; + String dbName2 = "encrdb2"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + + silentDropDatabase(dbName1); + silentDropDatabase(dbName2); + new DatabaseBuilder() + .setName(dbName1) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + new DatabaseBuilder() + .setName(dbName2) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + Path dirDb1 = new Path(warehouse.getWhRoot(), dbName1 +".db"); + warehouseFs.mkdirs(dirDb1); + shimCm.createEncryptionZone(dirDb1, "test_key_db"); + Path dirTbl1 = new Path(dirDb1, tblName1); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirDb2 = new Path(warehouse.getWhRoot(), dbName2 +".db"); + warehouseFs.mkdirs(dirDb2); + shimCm.createEncryptionZone(dirDb2, "test_key_db"); + Path dirTbl2 = new Path(dirDb2, tblName2); + warehouseFs.mkdirs(dirTbl2); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + new TableBuilder() + .setDbName(dbName1) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName1, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName2) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + boolean exceptionThrown = false; + try { + client.truncateTable(dbName1, tblName1, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + assertNotNull(client.getTable(dbName1, tblName1)); + exceptionThrown = false; + try { + client.truncateTable(dbName2, tblName2, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + assertNotNull(client.getTable(dbName2, tblName2)); + } + + @Test + public void truncateTableWithSameEncryptionZones() throws Throwable { + String dbName = "encrdb9"; + String tblName1 = "encrtbl1"; + String tblName2 = "encrtbl2"; + String typeName = "Person"; + client.dropTable(dbName, tblName1); + client.dropTable(dbName, tblName2); + silentDropDatabase(dbName); + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName1) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl = client.getTable(dbName, tblName1); + Assert.assertNotNull(tbl); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName2) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.delete(dirDb, true); + warehouseFs.mkdirs(dirDb); + shimCm.createEncryptionZone(dirDb, "test_key_db"); + Path dirTbl1 = new Path(dirDb, tblName1); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + Path dirTbl2 = new Path(dirDb, tblName2); + warehouseFs.mkdirs(dirTbl2); + Path part12 = new Path(dirTbl2, "part1"); + createFile(part12, "testClearer12"); + + boolean exceptionThrown = false; + try { + client.truncateTable(dbName, tblName1, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName1); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + + try { + client.truncateTable(dbName, tblName2, null); + } catch (MetaException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part12)); + try { + client.getTable(dbName, tblName2); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + } + + @Test + public void truncateTableWithoutEncryptionZonesForCm() throws Throwable { + String dbName = "simpdb3"; + String tblName = "simptbl"; + String typeName = "Person"; + client.dropTable(dbName, tblName); + silentDropDatabase(dbName); + + new DatabaseBuilder() + .setName(dbName) + .addParam("repl.source.for", "1, 2, 3") + .create(client, hiveConf); + + client.dropType(typeName); + Type typ1 = new Type(); + typ1.setName(typeName); + typ1.setFields(new ArrayList<>(2)); + typ1.getFields().add( + new FieldSchema("name", ColumnType.STRING_TYPE_NAME, "")); + typ1.getFields().add( + new FieldSchema("income", ColumnType.INT_TYPE_NAME, "")); + client.createType(typ1); + + new TableBuilder() + .setDbName(dbName) + .setTableName(tblName) + .setCols(typ1.getFields()) + .setNumBuckets(1) + .addBucketCol("name") + .addStorageDescriptorParam("test_param_1", "Use this for comments etc") + .create(client, hiveConf); + + Table tbl2 = client.getTable(dbName, tblName); + Assert.assertNotNull(tbl2); + + Path dirDb = new Path(warehouse.getWhRoot(), dbName +".db"); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, tblName); + warehouseFs.mkdirs(dirTbl1); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + boolean exceptionThrown = false; + try { + client.truncateTable(dbName, tblName, null); + } catch (Exception e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + assertFalse(warehouseFs.exists(part11)); + try { + client.getTable(dbName, tblName); + } catch (NoSuchObjectException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + } + + @Test + public void recycleFailureWithDifferentEncryptionZonesForCm() throws Throwable { + Path dirDb = new Path(warehouse.getWhRoot(), "db2"); + warehouseFs.delete(dirDb, true); + warehouseFs.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, "tbl1"); + warehouseFs.mkdirs(dirTbl1); + shimCm.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + boolean exceptionThrown = false; + try { + ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, RecycleType.MOVE, false); + } catch (RemoteException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + } + + @Test + public void testClearerEncrypted() throws Exception { + HiveConf hiveConfCmClearer = new HiveConf(TestReplChangeManager.class); + hiveConfCmClearer.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true); + hiveConfCmClearer.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60); + hiveConfCmClearer.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal); + + String cmrootCmClearer = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmrootClearer"; + hiveConfCmClearer.set(HiveConf.ConfVars.REPLCMDIR.varname, cmrootCmClearer); + Warehouse warehouseCmClearer = new Warehouse(hiveConfCmClearer); + FileSystem cmfs = new Path(cmrootCmClearer).getFileSystem(hiveConfCmClearer); + cmfs.mkdirs(warehouseCmClearer.getWhRoot()); + + HadoopShims.HdfsEncryptionShim shimCmEncrypted = ShimLoader.getHadoopShims().createHdfsEncryptionShim(cmfs, conf); + + FileSystem fsWarehouse = warehouseCmClearer.getWhRoot().getFileSystem(hiveConfCmClearer); + long now = System.currentTimeMillis(); + Path dirDb = new Path(warehouseCmClearer.getWhRoot(), "db1"); + fsWarehouse.delete(dirDb, true); + fsWarehouse.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, "tbl1"); + fsWarehouse.mkdirs(dirTbl1); + shimCmEncrypted.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + String fileChksum11 = ReplChangeManager.checksumFor(part11, fsWarehouse); + Path part12 = new Path(dirTbl1, "part2"); + createFile(part12, "testClearer12"); + String fileChksum12 = ReplChangeManager.checksumFor(part12, fsWarehouse); + Path dirTbl2 = new Path(dirDb, "tbl2"); + fsWarehouse.mkdirs(dirTbl2); + shimCmEncrypted.createEncryptionZone(dirTbl2, "test_key_db"); + Path part21 = new Path(dirTbl2, "part1"); + createFile(part21, "testClearer21"); + String fileChksum21 = ReplChangeManager.checksumFor(part21, fsWarehouse); + Path part22 = new Path(dirTbl2, "part2"); + createFile(part22, "testClearer22"); + String fileChksum22 = ReplChangeManager.checksumFor(part22, fsWarehouse); + Path dirTbl3 = new Path(dirDb, "tbl3"); + fsWarehouse.mkdirs(dirTbl3); + shimCmEncrypted.createEncryptionZone(dirTbl3, "test_key_cm"); + Path part31 = new Path(dirTbl3, "part1"); + createFile(part31, "testClearer31"); + String fileChksum31 = ReplChangeManager.checksumFor(part31, fsWarehouse); + Path part32 = new Path(dirTbl3, "part2"); + createFile(part32, "testClearer32"); + String fileChksum32 = ReplChangeManager.checksumFor(part32, fsWarehouse); + + ReplChangeManager.getInstance(hiveConfCmClearer).recycle(dirTbl1, RecycleType.MOVE, false); + ReplChangeManager.getInstance(hiveConfCmClearer).recycle(dirTbl2, RecycleType.MOVE, false); + ReplChangeManager.getInstance(hiveConfCmClearer).recycle(dirTbl3, RecycleType.MOVE, true); + + assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part11.getName(), fileChksum11, + ReplChangeManager.getCmRoot(part11).toString()))); + assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part12.getName(), fileChksum12, + ReplChangeManager.getCmRoot(part12).toString()))); + assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part21.getName(), fileChksum21, + ReplChangeManager.getCmRoot(part21).toString()))); + assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part22.getName(), fileChksum22, + ReplChangeManager.getCmRoot(part22).toString()))); + assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part31.getName(), fileChksum31, + ReplChangeManager.getCmRoot(part31).toString()))); + assertTrue(fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part32.getName(), fileChksum32, + ReplChangeManager.getCmRoot(part32).toString()))); + + fsWarehouse.setTimes(ReplChangeManager.getCMPath(hiveConfCmClearer, part11.getName(), fileChksum11, + ReplChangeManager.getCmRoot(part11).toString()), + now - 86400*1000*2, now - 86400*1000*2); + fsWarehouse.setTimes(ReplChangeManager.getCMPath(hiveConfCmClearer, part21.getName(), fileChksum21, + ReplChangeManager.getCmRoot(part21).toString()), + now - 86400*1000*2, now - 86400*1000*2); + fsWarehouse.setTimes(ReplChangeManager.getCMPath(hiveConfCmClearer, part31.getName(), fileChksum31, + ReplChangeManager.getCmRoot(part31).toString()), + now - 86400*1000*2, now - 86400*1000*2); + fsWarehouse.setTimes(ReplChangeManager.getCMPath(hiveConfCmClearer, part32.getName(), fileChksum32, + ReplChangeManager.getCmRoot(part32).toString()), + now - 86400*1000*2, now - 86400*1000*2); + + ReplChangeManager.scheduleCMClearer(hiveConfCmClearer); + + long start = System.currentTimeMillis(); + long end; + boolean cleared = false; + do { + Thread.sleep(200); + end = System.currentTimeMillis(); + if (end - start > 5000) { + Assert.fail("timeout, cmroot has not been cleared"); + } + if (!fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part11.getName(), fileChksum11, + ReplChangeManager.getCmRoot(part11).toString())) && + fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part12.getName(), fileChksum12, + ReplChangeManager.getCmRoot(part12).toString())) && + !fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part21.getName(), fileChksum21, + ReplChangeManager.getCmRoot(part21).toString())) && + fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part22.getName(), fileChksum22, + ReplChangeManager.getCmRoot(part22).toString())) && + !fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part31.getName(), fileChksum31, + ReplChangeManager.getCmRoot(part31).toString())) && + !fsWarehouse.exists(ReplChangeManager.getCMPath(hiveConfCmClearer, part32.getName(), fileChksum32, + ReplChangeManager.getCmRoot(part32).toString()))) { + cleared = true; + } + } while (!cleared); + } + + @Test + public void testCmrootEncrypted() throws Exception { + HiveConf encryptedHiveConf = new HiveConf(TestReplChangeManager.class); + encryptedHiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true); + encryptedHiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60); + encryptedHiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, + "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal); + + String cmrootdirEncrypted = "hdfs://" + miniDFSCluster.getNameNode().getHostAndPort() + "/cmroot"; + encryptedHiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmrootdirEncrypted); + encryptedHiveConf.set(HiveConf.ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR.varname, cmrootFallBack); + + //Create cm in encrypted zone + HadoopShims.HdfsEncryptionShim shimCmEncrypted = ShimLoader.getHadoopShims().createHdfsEncryptionShim(fs, conf); + shimCmEncrypted.createEncryptionZone(new Path(cmrootdirEncrypted), "test_key_db"); + ReplChangeManager.resetReplChangeManagerInstance(); + Warehouse warehouseEncrypted = new Warehouse(encryptedHiveConf); + FileSystem warehouseFsEncrypted = warehouseEncrypted.getWhRoot().getFileSystem(encryptedHiveConf); + FileSystem fsCmEncrypted = new Path(cmrootdirEncrypted).getFileSystem(encryptedHiveConf); + fsCmEncrypted.mkdirs(warehouseEncrypted.getWhRoot()); + + Path dirDb = new Path(warehouseEncrypted.getWhRoot(), "db3"); + warehouseFsEncrypted.delete(dirDb, true); + warehouseFsEncrypted.mkdirs(dirDb); + Path dirTbl1 = new Path(dirDb, "tbl1"); + warehouseFsEncrypted.mkdirs(dirTbl1); + shimCmEncrypted.createEncryptionZone(dirTbl1, "test_key_db"); + Path part11 = new Path(dirTbl1, "part1"); + createFile(part11, "testClearer11"); + + boolean exceptionThrown = false; + try { + ReplChangeManager.getInstance(encryptedHiveConf).recycle(dirTbl1, RecycleType.MOVE, false); + } catch (RemoteException e) { + exceptionThrown = true; + assertTrue(e.getMessage().contains("can't be moved from encryption zone")); + } + assertFalse(exceptionThrown); + + Path dirDbUnEncrypted = new Path(warehouseEncrypted.getWhRoot(), "db3en"); + warehouseFsEncrypted.delete(dirDbUnEncrypted, true); + warehouseFsEncrypted.mkdirs(dirDbUnEncrypted); + Path dirTblun1 = new Path(dirDbUnEncrypted, "tbl1"); + warehouseFsEncrypted.mkdirs(dirTblun1); + Path partun11 = new Path(dirTblun1, "part1"); + createFile(partun11, "testClearer11"); + + exceptionThrown = false; + try { + ReplChangeManager.getInstance(encryptedHiveConf).recycle(dirDbUnEncrypted, RecycleType.MOVE, false); + } catch (IOException e) { + exceptionThrown = true; + } + assertFalse(exceptionThrown); + ReplChangeManager.resetReplChangeManagerInstance(); + initReplChangeManager(); + } + + + private void createFile(Path path, String content) throws IOException { + FSDataOutputStream output = path.getFileSystem(hiveConf).create(path); + output.writeChars(content); + output.close(); + } + + private void silentDropDatabase(String dbName) throws TException { + try { + for (String tableName : client.getTables(dbName, "*")) { + client.dropTable(dbName, tableName); + } + client.dropDatabase(dbName); + } catch (NoSuchObjectException|InvalidOperationException e) { + // NOP + } + } +} diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java index 5ab4f91..d3891cf 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java @@ -271,6 +271,7 @@ public class TestReplChangeManager { FileSystem fs = warehouse.getWhRoot().getFileSystem(hiveConf); long now = System.currentTimeMillis(); Path dirDb = new Path(warehouse.getWhRoot(), "db3"); + fs.delete(dirDb, true); fs.mkdirs(dirDb); Path dirTbl1 = new Path(dirDb, "tbl1"); fs.mkdirs(dirTbl1); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java index 562b2c9..19d38d2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStoreUpdateUsingEvents.java @@ -48,6 +48,8 @@ public class TestCachedStoreUpdateUsingEvents { MetastoreConf.setBoolVar(conf, ConfVars.METASTORE_CACHE_CAN_USE_EVENT, true); MetastoreConf.setBoolVar(conf, ConfVars.HIVE_TXN_STATS_ENABLED, true); MetastoreConf.setBoolVar(conf, ConfVars.AGGREGATE_STATS_CACHE_ENABLED, false); + MetastoreConf.setBoolVar(conf, ConfVars.REPLCMENABLED, true); + MetastoreConf.setVar(conf, ConfVars.REPLCMDIR, "cmroot"); MetaStoreTestUtils.setConfForStandloneMode(conf); hmsHandler = new HiveMetaStore.HMSHandler("testCachedStore", conf, true); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/metadata/TestAlterTableMetadata.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/metadata/TestAlterTableMetadata.java index f6035fa..96aeb0f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/metadata/TestAlterTableMetadata.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/metadata/TestAlterTableMetadata.java @@ -63,5 +63,6 @@ public class TestAlterTableMetadata { table = Hive.get(conf).getTable("t1"); assertEquals(PrincipalType.ROLE, table.getOwnerType()); assertEquals("r1", table.getOwner()); + } } diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java index 92a0bbe..7e0a7f2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java @@ -202,6 +202,8 @@ public class TestJdbcDriver2 { System.setProperty(ConfVars.HIVE_AUTHORIZATION_MANAGER.varname, "org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider"); System.setProperty(ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION.varname, "false"); + System.setProperty(ConfVars.REPLCMENABLED.varname, "true"); + System.setProperty(ConfVars.REPLCMDIR.varname, "cmroot"); con = getConnection(defaultDbName + ";create=true"); Statement stmt = con.createStatement(); assertNotNull("Statement is null", stmt); @@ -2828,6 +2830,8 @@ public class TestJdbcDriver2 { stmt.execute("set hive.metastore.transactional.event.listeners =" + " org.apache.hive.hcatalog.listener.DbNotificationListener"); stmt.execute("set hive.metastore.dml.events = true"); + stmt.execute("set hive.repl.cm.enabled = true"); + stmt.execute("set hive.repl.cmrootdir = cmroot"); stmt.execute("create database " + primaryDb + " with dbproperties('repl.source.for'='1,2,3')"); stmt.execute("create table " + primaryTblName + " (id int)"); stmt.execute("insert into " + primaryTblName + " values (1), (2)"); diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index a3299ee..79beadd 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -106,6 +106,7 @@ public class TestJdbcWithMiniHS2 { HiveConf conf = new HiveConf(); dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); kvDataFilePath = new Path(dataFileDir, "kv1.txt"); + try { startMiniHS2(conf); } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java index cf4bc81..93c0209 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableOperation.java @@ -67,7 +67,6 @@ public class CreateTableOperation extends DDLOperation<CreateTableDesc> { if (desc.getReplicationSpec().allowEventReplacementInto(existingTable.getParameters())) { desc.setReplaceMode(true); // we replace existing table. ReplicationSpec.copyLastReplId(existingTable.getParameters(), tbl.getParameters()); - // If location of an existing managed table is changed, then need to delete the old location if exists. // This scenario occurs when a managed table is converted into external table at source. In this case, // at target, the table data would be moved to different location under base directory for external tables. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java index 5c5dec4..248fe0f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveOperation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.ddl.table.storage; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -308,7 +309,8 @@ public class AlterTableArchiveOperation extends DDLOperation<AlterTableArchiveDe private void deleteIntermediateOriginalDir(Table table, Path intermediateOriginalDir) throws HiveException { if (HdfsUtils.pathExists(intermediateOriginalDir, context.getConf())) { - AlterTableArchiveUtils.deleteDir(intermediateOriginalDir, context.getDb().getDatabase(table.getDbName()), + AlterTableArchiveUtils.deleteDir(intermediateOriginalDir, + ReplChangeManager.shouldEnableCm(context.getDb().getDatabase(table.getDbName()), table.getTTable()), context.getConf()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java index ebac6f6..c285405 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableArchiveUtils.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; @@ -95,10 +94,10 @@ final class AlterTableArchiveUtils { return new Path(dir.getParent(), dir.getName() + intermediateDirSuffix); } - static void deleteDir(Path dir, Database db, Configuration conf) throws HiveException { + static void deleteDir(Path dir, boolean shouldEnableCm, Configuration conf) throws HiveException { try { Warehouse wh = new Warehouse(conf); - wh.deleteDir(dir, true, db); + wh.deleteDir(dir, true, false, shouldEnableCm); } catch (MetaException e) { throw new HiveException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableSetLocationOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableSetLocationOperation.java index 22a29d7..509d577 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableSetLocationOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableSetLocationOperation.java @@ -43,6 +43,7 @@ public class AlterTableSetLocationOperation extends AbstractAlterTableOperation< protected void doAlteration(Table table, Partition partition) throws HiveException { StorageDescriptor sd = getStorageDescriptor(table, partition); String newLocation = desc.getLocation(); + try { URI locUri = new URI(newLocation); if (!new Path(locUri).isAbsolute()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java index 4f791a3..39416ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/AlterTableUnarchiveOperation.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.ddl.table.storage; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; import org.apache.hadoop.hive.ql.exec.ArchiveUtils; import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo; @@ -282,8 +283,9 @@ public class AlterTableUnarchiveOperation extends DDLOperation<AlterTableUnarchi private void deleteIntermediateArchivedDir(Table table, Path intermediateArchivedDir) throws HiveException { if (HdfsUtils.pathExists(intermediateArchivedDir, context.getConf())) { - AlterTableArchiveUtils.deleteDir(intermediateArchivedDir, context.getDb().getDatabase(table.getDbName()), - context.getConf()); + AlterTableArchiveUtils.deleteDir(intermediateArchivedDir, + ReplChangeManager.shouldEnableCm(context.getDb().getDatabase(table.getDbName()), table.getTTable()), + context.getConf()); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 7f061d4..945eafc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2369,7 +2369,7 @@ public class Hive { // base_x. (there is Insert Overwrite and Load Data Overwrite) boolean isAutoPurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); boolean needRecycle = !tbl.isTemporary() - && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName())); + && ReplChangeManager.shouldEnableCm(Hive.get().getDatabase(tbl.getDbName()), tbl.getTTable()); replaceFiles(tbl.getPath(), loadPath, destPath, oldPartPath, getConf(), isSrcLocal, isAutoPurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged, isInsertOverwrite); } else { @@ -3123,7 +3123,7 @@ private void constructOneLBLocationMap(FileStatus fSta, //for fullAcid we don't want to delete any files even for OVERWRITE see HIVE-14988/HIVE-17361 boolean isAutopurge = "true".equalsIgnoreCase(tbl.getProperty("auto.purge")); boolean needRecycle = !tbl.isTemporary() - && ReplChangeManager.isSourceOfReplication(Hive.get().getDatabase(tbl.getDbName())); + && ReplChangeManager.shouldEnableCm(Hive.get().getDatabase(tbl.getDbName()), tbl.getTTable()); replaceFiles(tblPath, loadPath, destPath, tblPath, conf, isSrcLocal, isAutopurge, newFiles, FileUtils.HIDDEN_FILES_PATH_FILTER, needRecycle, isManaged, isInsertOverwrite); } else { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 7b6ce10..dd97f3d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -467,7 +467,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { needRecycle = false; } else { org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName()); - needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db); + needRecycle = db != null && ReplChangeManager.shouldEnableCm(db, table.getTTable()); } } else { if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) { @@ -613,7 +613,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { needRecycle = false; } else { org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName()); - needRecycle = db != null && ReplChangeManager.isSourceOfReplication(db); + needRecycle = db != null && ReplChangeManager.shouldEnableCm(db, table.getTTable()); } } else { loadFileType = replicationSpec.isReplace() ? diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 9ba2b24..54b616e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -260,11 +260,11 @@ public class Cleaner extends MetaStoreCompactorThread { FileSystem fs = filesToDelete.get(0).getFileSystem(conf); Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname); - boolean isSourceOfRepl = ReplChangeManager.isSourceOfReplication(db); + Table table = getMSForConf(conf).getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName); for (Path dead : filesToDelete) { LOG.debug("Going to delete path " + dead.toString()); - if (isSourceOfRepl) { + if (ReplChangeManager.shouldEnableCm(db, table)) { replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true); } fs.delete(dead, true); diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 23e7d5e..2eafef0 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -1242,7 +1242,7 @@ public class Hadoop23Shims extends HadoopShimsSecure { return (getEncryptionZoneForPath(fullPath) != null); } - private EncryptionZone getEncryptionZoneForPath(Path path) throws IOException { + public EncryptionZone getEncryptionZoneForPath(Path path) throws IOException { if (path.getFileSystem(conf).exists(path)) { return hdfsAdmin.getEncryptionZoneForPath(path); } else if (!path.getParent().equals(path)) { diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java index 49a2ab3..f71f5a5 100644 --- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; @@ -564,7 +565,7 @@ public interface HadoopShims { public int comparePathKeyStrength(Path path1, Path path2) throws IOException; /** - * create encryption zone by path and keyname + * Create encryption zone by path and keyname. * @param path HDFS path to create encryption zone * @param keyName keyname * @throws IOException @@ -573,6 +574,13 @@ public interface HadoopShims { public void createEncryptionZone(Path path, String keyName) throws IOException; /** + * Get encryption zone by path. + * @param path HDFS path to create encryption zone. + * @throws IOException + */ + EncryptionZone getEncryptionZoneForPath(Path path) throws IOException; + + /** * Creates an encryption key. * * @param keyName Name of the key @@ -625,6 +633,11 @@ public interface HadoopShims { } @Override + public EncryptionZone getEncryptionZoneForPath(Path path) throws IOException { + return null; + } + + @Override public void createKey(String keyName, int bitLength) { /* not supported */ } diff --git a/standalone-metastore/metastore-common/pom.xml b/standalone-metastore/metastore-common/pom.xml index e252f12..81dc6b6 100644 --- a/standalone-metastore/metastore-common/pom.xml +++ b/standalone-metastore/metastore-common/pom.xml @@ -256,6 +256,11 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <profiles> diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index c6acc57..1041d92 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -19,11 +19,13 @@ package org.apache.hadoop.hive.metastore; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileChecksum; @@ -36,11 +38,18 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; import org.apache.hadoop.hive.metastore.utils.FileUtils; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.utils.Retry; import org.apache.hadoop.hive.metastore.utils.StringUtils; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.hive.shims.HadoopShims.HdfsEncryptionShim; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +59,8 @@ public class ReplChangeManager { private static boolean inited = false; private static boolean enabled = false; - private static Path cmroot; + private static Map<String, String> encryptionZones = new HashMap<>(); + private static HadoopShims hadoopShims = ShimLoader.getHadoopShims(); private static Configuration conf; private String msUser; private String msGroup; @@ -61,6 +71,10 @@ public class ReplChangeManager { public static final String SOURCE_OF_REPLICATION = "repl.source.for"; private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]"; static final String CM_THREAD_NAME_PREFIX = "cmclearer-"; + private static final String NO_ENCRYPTION = "noEncryption"; + private static String cmRootDir; + private static String encryptedCmRootDir; + private static String fallbackNonEncryptedCmRootDir; public enum RecycleType { MOVE, @@ -138,14 +152,27 @@ public class ReplChangeManager { if (!inited) { if (MetastoreConf.getBoolVar(conf, ConfVars.REPLCMENABLED)) { ReplChangeManager.enabled = true; - ReplChangeManager.cmroot = new Path(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR)); ReplChangeManager.conf = conf; - - FileSystem cmFs = cmroot.getFileSystem(conf); - // Create cmroot with permission 700 if not exist - if (!cmFs.exists(cmroot)) { - cmFs.mkdirs(cmroot); - cmFs.setPermission(cmroot, new FsPermission("700")); + cmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMDIR); + encryptedCmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMENCRYPTEDDIR); + fallbackNonEncryptedCmRootDir = MetastoreConf.getVar(conf, ConfVars.REPLCMFALLBACKNONENCRYPTEDDIR); + //Create default cm root + Path cmroot = new Path(cmRootDir); + createCmRoot(cmroot); + FileSystem cmRootFs = cmroot.getFileSystem(conf); + HdfsEncryptionShim pathEncryptionShim = hadoopShims + .createHdfsEncryptionShim(cmRootFs, conf); + Path cmRootEncrypted = new Path(encryptedCmRootDir); + if (cmRootEncrypted.isAbsolute()) { + throw new MetaException(ConfVars.REPLCMENCRYPTEDDIR.getHiveName() + " should be a relative path"); + } + if (pathEncryptionShim.isPathEncrypted(cmroot)) { + //If cm root is encrypted we keep using it for the encryption zone + String encryptionZonePath = cmRootFs.getUri() + + pathEncryptionShim.getEncryptionZoneForPath(cmroot).getPath(); + encryptionZones.put(encryptionZonePath, cmRootDir); + } else { + encryptionZones.put(NO_ENCRYPTION, cmRootDir); } UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser(); msUser = usergroupInfo.getShortUserName(); @@ -194,7 +221,7 @@ public class ReplChangeManager { } } else { String fileCheckSum = checksumFor(path, fs); - Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, cmroot.toString()); + Path cmPath = getCMPath(conf, path.getName(), fileCheckSum, getCmRoot(path).toString()); // set timestamp before moving to cmroot, so we can // avoid race condition CM remove the file before setting @@ -213,9 +240,18 @@ public class ReplChangeManager { switch (type) { case MOVE: { LOG.info("Moving {} to {}", path.toString(), cmPath.toString()); - // Rename fails if the file with same name already exist. - success = fs.rename(path, cmPath); + Retry<Boolean> retriable = new Retry<Boolean>(IOException.class) { + @Override + public Boolean execute() throws IOException { + return fs.rename(path, cmPath); + } + }; + try { + success = retriable.run(); + } catch (Exception e) { + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } break; } case COPY: { @@ -361,9 +397,10 @@ public class ReplChangeManager { throw new IllegalStateException("Uninitialized ReplChangeManager instance."); } String encodedUri = fileUriStr; - if ((fileChecksum != null) && (cmroot != null)) { + Path cmRoot = getCmRoot(new Path(fileUriStr)); + if ((fileChecksum != null) && (cmRoot != null)) { encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum - + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmroot, conf); + + URI_FRAGMENT_SEPARATOR + FileUtils.makeQualified(cmRoot, conf); } else { encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + URI_FRAGMENT_SEPARATOR; } @@ -404,12 +441,12 @@ public class ReplChangeManager { * Thread to clear old files of cmroot recursively */ static class CMClearer implements Runnable { - private Path cmroot; + private Map<String, String> encryptionZones; private long secRetain; private Configuration conf; - CMClearer(String cmrootString, long secRetain, Configuration conf) { - this.cmroot = new Path(cmrootString); + CMClearer(Map<String, String> encryptionZones, long secRetain, Configuration conf) { + this.encryptionZones = encryptionZones; this.secRetain = secRetain; this.conf = conf; } @@ -418,32 +455,34 @@ public class ReplChangeManager { public void run() { try { LOG.info("CMClearer started"); - - long now = System.currentTimeMillis(); - FileSystem fs = cmroot.getFileSystem(conf); - FileStatus[] files = fs.listStatus(cmroot); - - for (FileStatus file : files) { - long modifiedTime = file.getModificationTime(); - if (now - modifiedTime > secRetain*1000) { - try { - if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) { - boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), conf); - if (succ) { - LOG.debug("Move " + file.toString() + " to trash"); + for (String cmrootString : encryptionZones.values()) { + Path cmroot = new Path(cmrootString); + long now = System.currentTimeMillis(); + FileSystem fs = cmroot.getFileSystem(conf); + FileStatus[] files = fs.listStatus(cmroot); + + for (FileStatus file : files) { + long modifiedTime = file.getModificationTime(); + if (now - modifiedTime > secRetain * 1000) { + try { + if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) { + boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), conf); + if (succ) { + LOG.debug("Move " + file.toString() + " to trash"); + } else { + LOG.warn("Fail to move " + file.toString() + " to trash"); + } } else { - LOG.warn("Fail to move " + file.toString() + " to trash"); - } - } else { - boolean succ = fs.delete(file.getPath(), false); - if (succ) { - LOG.debug("Remove " + file.toString()); - } else { - LOG.warn("Fail to remove " + file.toString()); + boolean succ = fs.delete(file.getPath(), false); + if (succ) { + LOG.debug("Remove " + file.toString()); + } else { + LOG.warn("Fail to remove " + file.toString()); + } } + } catch (UnsupportedOperationException e) { + LOG.warn("Error getting xattr for " + file.getPath().toString()); } - } catch (UnsupportedOperationException e) { - LOG.warn("Error getting xattr for " + file.getPath().toString()); } } } @@ -461,12 +500,17 @@ public class ReplChangeManager { .namingPattern(CM_THREAD_NAME_PREFIX + "%d") .daemon(true) .build()); - executor.scheduleAtFixedRate(new CMClearer(MetastoreConf.getVar(conf, ConfVars.REPLCMDIR), - MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf), - 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); + executor.scheduleAtFixedRate(new CMClearer(encryptionZones, + MetastoreConf.getTimeVar(conf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), conf), + 0, MetastoreConf.getTimeVar(conf, ConfVars.REPLCMINTERVAL, TimeUnit.SECONDS), TimeUnit.SECONDS); } } + public static boolean shouldEnableCm(Database db, Table table) { + assert (table != null); + return isSourceOfReplication(db) && !MetaStoreUtils.isExternalTable(table); + } + public static boolean isSourceOfReplication(Database db) { assert (db != null); String replPolicyIds = getReplPolicyIdString(db); @@ -493,4 +537,63 @@ public class ReplChangeManager { public static String[] getListFromSeparatedString(String commaSeparatedString) { return commaSeparatedString.split("\\s*" + TXN_WRITE_EVENT_FILE_SEPARATOR + "\\s*"); } + + @VisibleForTesting + static Path getCmRoot(Path path) throws IOException { + Path cmroot = null; + //Default path if hive.repl.cm dir is encrypted + String cmrootDir = fallbackNonEncryptedCmRootDir; + String encryptionZonePath = NO_ENCRYPTION; + if (enabled) { + HdfsEncryptionShim pathEncryptionShim = hadoopShims.createHdfsEncryptionShim(path.getFileSystem(conf), conf); + if (pathEncryptionShim.isPathEncrypted(path)) { + encryptionZonePath = path.getFileSystem(conf).getUri() + + pathEncryptionShim.getEncryptionZoneForPath(path).getPath(); + //For encryption zone, create cm at the relative path specified by hive.repl.cm.encryptionzone.rootdir + //at the root of the encryption zone + cmrootDir = encryptionZonePath + Path.SEPARATOR + encryptedCmRootDir; + } + if (encryptionZones.containsKey(encryptionZonePath)) { + cmroot = new Path(encryptionZones.get(encryptionZonePath)); + } else { + cmroot = new Path(cmrootDir); + synchronized (instance) { + if (!encryptionZones.containsKey(encryptionZonePath)) { + createCmRoot(cmroot); + encryptionZones.put(encryptionZonePath, cmrootDir); + } + } + } + } + return cmroot; + } + + private static void createCmRoot(Path cmroot) throws IOException { + FileSystem cmFs = cmroot.getFileSystem(conf); + // Create cmroot with permission 700 if not exist + if (!cmFs.exists(cmroot)) { + cmFs.mkdirs(cmroot); + cmFs.setPermission(cmroot, new FsPermission("700")); + } + } + + @VisibleForTesting + static void resetReplChangeManagerInstance() { + inited = false; + enabled = false; + instance = null; + encryptionZones.clear(); + } + + public static final PathFilter CMROOT_PATH_FILTER = new PathFilter() { + @Override + public boolean accept(Path p) { + if (enabled) { + String name = p.getName(); + return !name.contains(cmRootDir) && !name.contains(encryptedCmRootDir) + && !name.contains(fallbackNonEncryptedCmRootDir); + } + return true; + } + }; } diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index bc87e8f..2aeb374 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -917,6 +917,11 @@ public class MetastoreConf { "This class is used to store and retrieval of raw metadata objects such as table, database"), REPLCMDIR("metastore.repl.cmrootdir", "hive.repl.cmrootdir", "/user/${system:user.name}/cmroot/", "Root dir for ChangeManager, used for deleted files."), + REPLCMENCRYPTEDDIR("metastore.repl.cm.encryptionzone.rootdir", "hive.repl.cm.encryptionzone.rootdir", ".cmroot", + "Root dir for ChangeManager if encryption zones are enabled, used for deleted files."), + REPLCMFALLBACKNONENCRYPTEDDIR("metastore.repl.cm.nonencryptionzone.rootdir", + "hive.repl.cm.nonencryptionzone.rootdir", "/user/${system:user.name}/cmroot/", + "Root dir for ChangeManager for non encrypted paths if hive.repl.cmrootdir is encrypted."), REPLCMRETIAN("metastore.repl.cm.retain", "hive.repl.cm.retain", 24, TimeUnit.HOURS, "Time to retain removed files in cmrootdir."), REPLCMINTERVAL("metastore.repl.cm.interval", "hive.repl.cm.interval", 3600, TimeUnit.SECONDS, diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java new file mode 100644 index 0000000..bdb269a --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/Retry.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.utils; + +/** + * Class to implement any retry logic in case of exceptions. + */ +public abstract class Retry<T> { + + public static final int MAX_RETRIES = 3; + private int tries = 0; + private Class retryExceptionType; + + public Retry(Class exceptionClassType) { + this.retryExceptionType = exceptionClassType; + } + + public abstract T execute() throws Exception; + + public T run() throws Exception { + try { + return execute(); + } catch(Exception e) { + if (e.getClass().equals(retryExceptionType)){ + tries++; + if (MAX_RETRIES == tries) { + throw e; + } else { + return run(); + } + } else { + throw e; + } + } + } +} diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/package-info.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/package-info.java new file mode 100644 index 0000000..2eb51c8 --- /dev/null +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package consisting the utility methods for metastore. + */ +package org.apache.hadoop.hive.metastore.utils; diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java new file mode 100644 index 0000000..67bd658 --- /dev/null +++ b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/RetryTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore.utils; + +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for retriable interface. + */ +public class RetryTest { + @Test + public void testRetrySuccess() { + Retry<Void> retriable = new Retry<Void>(NullPointerException.class) { + @Override + public Void execute() { + throw new NullPointerException(); + } + }; + try { + retriable.run(); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class, e.getClass()); + } + } + + @Test + public void testRetryFailure() { + Retry<Void> retriable = new Retry<Void>(NullPointerException.class) { + @Override + public Void execute() { + throw new RuntimeException(); + } + }; + try { + retriable.run(); + } catch (Exception e) { + Assert.assertEquals(RuntimeException.class, e.getClass()); + } + } +} diff --git a/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java new file mode 100644 index 0000000..2eb51c8 --- /dev/null +++ b/standalone-metastore/metastore-common/src/test/java/org/apache/hadoop/hive/metastore/utils/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package consisting the utility methods for metastore. + */ +package org.apache.hadoop.hive.metastore.utils; diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java index dda407a..8d77ffe 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java @@ -272,7 +272,7 @@ public class HiveAlterHandler implements AlterHandler { } // check that src exists and also checks permissions necessary, rename src to dest if (srcFs.exists(srcPath) && wh.renameDir(srcPath, destPath, - ReplChangeManager.isSourceOfReplication(olddb))) { + ReplChangeManager.shouldEnableCm(olddb, oldt))) { dataWasMoved = true; } } catch (IOException | MetaException e) { @@ -428,7 +428,8 @@ public class HiveAlterHandler implements AlterHandler { Path deleteOldDataLoc = new Path(oldt.getSd().getLocation()); boolean isAutoPurge = "true".equalsIgnoreCase(oldt.getParameters().get("auto.purge")); try { - wh.deleteDir(deleteOldDataLoc, true, isAutoPurge, olddb); + wh.deleteDir(deleteOldDataLoc, true, isAutoPurge, + ReplChangeManager.shouldEnableCm(olddb, oldt)); LOG.info("Deleted the old data location: {} for the table: {}", deleteOldDataLoc, dbname + "." + name); } catch (MetaException ex) { @@ -651,7 +652,7 @@ public class HiveAlterHandler implements AlterHandler { } //rename the data directory - wh.renameDir(srcPath, destPath, ReplChangeManager.isSourceOfReplication(db)); + wh.renameDir(srcPath, destPath, ReplChangeManager.shouldEnableCm(db, tbl)); LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done."); dataWasMoved = true; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index b8418c5..ee9f988 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -2228,7 +2228,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(tblPath, true, db); + wh.deleteDir(tblPath, true, false, ReplChangeManager.shouldEnableCm(db, tbl)); } } @@ -2806,10 +2806,9 @@ public class HiveMetaStore extends ThriftHiveMetastore { } else if (tableDataShouldBeDeleted) { // Data needs deletion. Check if trash may be skipped. // Delete the data in the partitions which have other locations - deletePartitionData(partPaths, ifPurge, db); + deletePartitionData(partPaths, ifPurge, ReplChangeManager.shouldEnableCm(db, tbl)); // Delete the data in the table - deleteTableData(tblPath, ifPurge, db); - // ok even if the data is not deleted + deleteTableData(tblPath, ifPurge, ReplChangeManager.shouldEnableCm(db, tbl)); } if (!listeners.isEmpty()) { @@ -2837,16 +2836,49 @@ public class HiveMetaStore extends ThriftHiveMetastore { * @param tablePath * @param ifPurge completely purge the table (skipping trash) while removing * data from warehouse - * @param db database the table belongs to + * @param shouldEnableCm If cm should be enabled */ - private void deleteTableData(Path tablePath, boolean ifPurge, Database db) { + private void deleteTableData(Path tablePath, boolean ifPurge, boolean shouldEnableCm) { + if (tablePath != null) { + try { + if (shouldEnableCm) { + //Don't delete cmdir if its inside the table path + FileStatus[] statuses = tablePath.getFileSystem(conf).listStatus(tablePath, + ReplChangeManager.CMROOT_PATH_FILTER); + for (final FileStatus status : statuses) { + wh.deleteDir(status.getPath(), true, ifPurge, shouldEnableCm); + } + //Check if table directory is empty, delete it + FileStatus[] statusWithoutFilter = tablePath.getFileSystem(conf).listStatus(tablePath); + if (statusWithoutFilter.length == 0) { + wh.deleteDir(tablePath, true, ifPurge, shouldEnableCm); + } + } else { + //If no cm delete the complete table directory + wh.deleteDir(tablePath, true, ifPurge, shouldEnableCm); + } + } catch (Exception e) { + LOG.error("Failed to delete table directory: " + tablePath + + " " + e.getMessage()); + } + } + } + /** + * Deletes the data in a table's location, if it fails logs an error. + * + * @param tablePath + * @param ifPurge completely purge the table (skipping trash) while removing + * data from warehouse + * @param db Database + */ + private void deleteTableData(Path tablePath, boolean ifPurge, Database db) { if (tablePath != null) { try { wh.deleteDir(tablePath, true, ifPurge, db); } catch (Exception e) { LOG.error("Failed to delete table directory: " + tablePath + - " " + e.getMessage()); + " " + e.getMessage()); } } } @@ -2858,8 +2890,45 @@ public class HiveMetaStore extends ThriftHiveMetastore { * @param partPaths * @param ifPurge completely purge the partition (skipping trash) while * removing data from warehouse - * @param db database the partition belongs to + * @param shouldEnableCm If cm should be enabled */ + private void deletePartitionData(List<Path> partPaths, boolean ifPurge, boolean shouldEnableCm) { + if (partPaths != null && !partPaths.isEmpty()) { + for (Path partPath : partPaths) { + try { + if (shouldEnableCm) { + //Don't delete cmdir if its inside the partition path + FileStatus[] statuses = partPath.getFileSystem(conf).listStatus(partPath, + ReplChangeManager.CMROOT_PATH_FILTER); + for (final FileStatus status : statuses) { + wh.deleteDir(status.getPath(), true, ifPurge, shouldEnableCm); + } + //Check if table directory is empty, delete it + FileStatus[] statusWithoutFilter = partPath.getFileSystem(conf).listStatus(partPath); + if (statusWithoutFilter.length == 0) { + wh.deleteDir(partPath, true, ifPurge, shouldEnableCm); + } + } else { + //If no cm delete the complete table directory + wh.deleteDir(partPath, true, ifPurge, shouldEnableCm); + } + } catch (Exception e) { + LOG.error("Failed to delete partition directory: " + partPath + + " " + e.getMessage()); + } + } + } + } + + /** + * Give a list of partitions' locations, tries to delete each one + * and for each that fails logs an error. + * + * @param partPaths + * @param ifPurge completely purge the partition (skipping trash) while + * removing data from warehouse + * @param db Database + */ private void deletePartitionData(List<Path> partPaths, boolean ifPurge, Database db) { if (partPaths != null && !partPaths.isEmpty()) { for (Path partPath : partPaths) { @@ -2867,7 +2936,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { wh.deleteDir(partPath, true, ifPurge, db); } catch (Exception e) { LOG.error("Failed to delete partition directory: " + partPath + - " " + e.getMessage()); + " " + e.getMessage()); } } } @@ -3125,7 +3194,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(getConf(), fs, location); FileStatus targetStatus = fs.getFileStatus(location); String targetGroup = targetStatus == null ? null : targetStatus.getGroup(); - wh.deleteDir(location, true, isAutopurge, db); + wh.deleteDir(location, true, isAutopurge, ReplChangeManager.shouldEnableCm(db, tbl)); fs.mkdirs(location); HdfsUtils.setFullFileStatus(getConf(), status, targetGroup, fs, location, false); } else { @@ -3134,7 +3203,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { continue; } for (final FileStatus status : statuses) { - wh.deleteDir(status.getPath(), true, isAutopurge, db); + wh.deleteDir(status.getPath(), true, isAutopurge, ReplChangeManager.shouldEnableCm(db, tbl)); } } } @@ -3619,7 +3688,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (!success) { ms.rollbackTransaction(); if (madeDir) { - wh.deleteDir(partLocation, true, db); + wh.deleteDir(partLocation, true, false, ReplChangeManager.shouldEnableCm(db, tbl)); } } @@ -4352,8 +4421,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { success = ms.addPartition(part); } finally { if (!success && madeDir) { - wh.deleteDir(new Path(part.getSd().getLocation()), true, - ms.getDatabase(tbl.getCatName(), tbl.getDbName())); + wh.deleteDir(new Path(part.getSd().getLocation()), true, false, + ReplChangeManager.shouldEnableCm(ms.getDatabase(part.getCatName(), part.getDbName()), tbl)); } } @@ -4623,7 +4692,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { Path archiveParentDir = null; boolean mustPurge = false; boolean tableDataShouldBeDeleted = false; - boolean isSourceOfReplication = false; + boolean needsCm = false; Map<String, String> transactionalListenerResponses = Collections.emptyMap(); if (db_name == null) { @@ -4671,7 +4740,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { new DropPartitionEvent(tbl, part, true, deleteData, this), envContext); } - isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, db_name)); + needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, db_name), tbl); success = ms.commitTransaction(); } } finally { @@ -4690,11 +4759,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (isArchived) { assert (archiveParentDir != null); - wh.deleteDir(archiveParentDir, true, mustPurge, isSourceOfReplication); + wh.deleteDir(archiveParentDir, true, mustPurge, needsCm); } else { assert (partPath != null); - wh.deleteDir(partPath, true, mustPurge, isSourceOfReplication); - deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, isSourceOfReplication); + wh.deleteDir(partPath, true, mustPurge, needsCm); + deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, needsCm); } // ok even if the data is not deleted } @@ -4770,7 +4839,8 @@ public class HiveMetaStore extends ThriftHiveMetastore { List<Partition> parts = null; boolean mustPurge = false; List<Map<String, String>> transactionalListenerResponses = Lists.newArrayList(); - boolean isSourceOfReplication = ReplChangeManager.isSourceOfReplication(ms.getDatabase(catName, dbName)); + boolean needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, dbName), + ms.getTable(catName, dbName, tblName)); try { // We need Partition-s for firing events and for result; DN needs MPartition-s to drop. @@ -4878,12 +4948,12 @@ public class HiveMetaStore extends ThriftHiveMetastore { // Archived partitions have har:/to_har_file as their location. // The original directory was saved in params for (Path path : archToDelete) { - wh.deleteDir(path, true, mustPurge, isSourceOfReplication); + wh.deleteDir(path, true, mustPurge, needsCm); } for (PathAndPartValSize p : dirsToDelete) { - wh.deleteDir(p.path, true, mustPurge, isSourceOfReplication); + wh.deleteDir(p.path, true, mustPurge, needsCm); try { - deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, isSourceOfReplication); + deleteParentRecursive(p.path.getParent(), p.partValSize - 1, mustPurge, needsCm); } catch (IOException ex) { LOG.warn("Error from deleteParentRecursive", ex); throw new MetaException("Failed to delete parent: " + ex.getMessage()); @@ -7785,7 +7855,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { if (func == null) { throw new NoSuchObjectException("Function " + funcName + " does not exist"); } - Boolean isSourceOfReplication = + Boolean needsCm = ReplChangeManager.isSourceOfReplication(get_database_core(parsedDbName[CAT_NAME], parsedDbName[DB_NAME])); // if copy of jar to change management fails we fail the metastore transaction, since the @@ -7793,7 +7863,7 @@ public class HiveMetaStore extends ThriftHiveMetastore { // a copy is required to allow incremental replication to work correctly. if (func.getResourceUris() != null && !func.getResourceUris().isEmpty()) { for (ResourceUri uri : func.getResourceUris()) { - if (uri.getUri().toLowerCase().startsWith("hdfs:") && isSourceOfReplication) { + if (uri.getUri().toLowerCase().startsWith("hdfs:") && needsCm) { wh.addToChangeManagement(new Path(uri.getUri())); } }