This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 63dd09c HIVE-23474: Deny Repl Dump if the database is a target of replication (Aasha Medhi, reviewed by Pravin Kumar Sinha) 63dd09c is described below commit 63dd09c941c9372e9fb30a50d5567d4a86255a2a Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Mon Jul 20 09:53:56 2020 +0530 HIVE-23474: Deny Repl Dump if the database is a target of replication (Aasha Medhi, reviewed by Pravin Kumar Sinha) --- .../java/org/apache/hadoop/hive/ql/ErrorMsg.java | 3 ++- .../parse/TestReplicationScenariosAcidTables.java | 22 ++++++++++++++++++++++ .../TestReplicationScenariosAcrossInstances.java | 4 +++- .../hadoop/hive/ql/parse/WarehouseInstance.java | 6 ++++++ .../ql/exec/repl/bootstrap/load/LoadDatabase.java | 3 +++ .../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 12 ++++++++++++ .../hive/ql/parse/ReplicationSemanticAnalyzer.java | 4 ++++ 7 files changed, 52 insertions(+), 2 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 4b63653..d943412 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -615,7 +615,8 @@ public enum ErrorMsg { //========================== 40000 range starts here ========================// SPARK_JOB_RUNTIME_ERROR(40001, "Spark job failed due to: {0}", true), - SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true) + SPARK_TASK_RUNTIME_ERROR(40002, "Spark job failed due to task failures: {0}", true), + REPL_DATABASE_IS_TARGET_OF_REPLICATION(40003, "Cannot dump database as it is a Target of replication.") ; private int errorCode; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java index 71326ec..bf8a00d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java @@ -1895,6 +1895,28 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()))); } + @Test + public void testReplTargetOfReplication() throws Throwable { + // Bootstrap + WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null); + replica.load(replicatedDbName, primaryDbName).verifyReplTargetProperty(replicatedDbName); + verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true); + + //Try to do a dump on replicated db. It should fail + replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='1')"); + try { + replica.dump(replicatedDbName); + } catch (Exception e) { + Assert.assertEquals("Cannot dump database as it is a Target of replication.", e.getMessage()); + } + replica.run("alter database " + replicatedDbName + " set dbproperties ('repl.source.for'='')"); + + //Try to dump a different db on replica. That should succeed + replica.run("create database " + replicatedDbName + "_extra with dbproperties ('repl.source.for' = '1, 2, 3')") + .dump(replicatedDbName + "_extra"); + replica.run("drop database if exists " + replicatedDbName + "_extra cascade"); + } + private void verifyPathExist(FileSystem fs, Path filePath) throws IOException { assertTrue("Path not found:" + filePath, fs.exists(filePath)); } diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index d7b360c..60074ae 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -63,6 +63,7 @@ import java.util.stream.Collectors; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT; +import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.TARGET_OF_REPLICATION; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -969,7 +970,8 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro replica.load(replicatedDbName, primaryDbName); // first successful incremental load. // Bootstrap Repl B -> C - WarehouseInstance.Tuple tupleReplica = replica.dump(replicatedDbName); + WarehouseInstance.Tuple tupleReplica = replica.run("alter database " + replicatedDbName + + " set dbproperties ('" + TARGET_OF_REPLICATION + "' = '')").dump(replicatedDbName); String replDbFromReplica = replicatedDbName + "_dupe"; replica.load(replDbFromReplica, replicatedDbName) .run("use " + replDbFromReplica) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 498d59c..0a7d5a0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -468,6 +468,12 @@ public class WarehouseInstance implements Closeable { assertTrue(props.containsKey(ReplConst.REPL_TARGET_TABLE_PROPERTY)); } + public void verifyTargetOfReplProperty(String dbName) throws Exception { + Database db = getDatabase(dbName); + assertTrue(db.getParameters().containsKey(ReplUtils.TARGET_OF_REPLICATION)); + assertTrue(Boolean.getBoolean(db.getParameters().get(ReplUtils.TARGET_OF_REPLICATION))); + } + public WarehouseInstance verifyReplTargetProperty(String dbName, List<String> tblNames) throws Exception { for (String tblName : tblNames) { verifyReplTargetProperty(getTable(dbName, tblName).getParameters()); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java index 85e9add..1444e15 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java @@ -156,6 +156,9 @@ public class LoadDatabase { // done for this database or not. If compaction is done before first incremental then duplicate check will fail as // compaction may change the directory structure. parameters.put(ReplUtils.REPL_FIRST_INC_PENDING_FLAG, "true"); + //This flag will be set to identify its a target of replication. Repl dump won't be allowed on a database + //which is a target of replication. + parameters.put(ReplUtils.TARGET_OF_REPLICATION, "true"); return parameters; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java index bccf56a..eaa6690 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.repl.ReplScope; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; +import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.utils.StringUtils; @@ -130,6 +131,8 @@ public class ReplUtils { public static final String RANGER_HIVE_SERVICE_NAME = "ranger.plugin.hive.service.name"; public static final String RANGER_CONFIGURATION_RESOURCE_NAME = "ranger-hive-security.xml"; + + public static final String TARGET_OF_REPLICATION = "repl.target.for"; /** * Bootstrap REPL LOAD operation type on the examined object based on ckpt state. */ @@ -210,6 +213,15 @@ public class ReplUtils { return false; } + public static boolean isTargetOfReplication(Database db) { + assert (db != null); + Map<String, String> m = db.getParameters(); + if ((m != null) && (m.containsKey(TARGET_OF_REPLICATION))) { + return !StringUtils.isEmpty(m.get(TARGET_OF_REPLICATION)); + } + return false; + } + public static String getNonEmpty(String configParam, HiveConf hiveConf, String errorMsgFormat) throws SemanticException { String val = hiveConf.get(configParam); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index ed358f3..5e3f3a5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -219,6 +219,10 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { " as it is not a source of replication (repl.source.for)"); throw new SemanticException(ErrorMsg.REPL_DATABASE_IS_NOT_SOURCE_OF_REPLICATION.getMsg()); } + if (ReplUtils.isTargetOfReplication(database)) { + LOG.error("Cannot dump database " + dbNameOrPattern + " as it is a target of replication (repl.target.for)"); + throw new SemanticException(ErrorMsg.REPL_DATABASE_IS_TARGET_OF_REPLICATION.getMsg()); + } } else { throw new SemanticException("Cannot dump database " + dbNameOrPattern + " as it does not exist"); }