Repository: hive
Updated Branches:
  refs/heads/master 0a961aa8f -> 787e6f994


HIVE-19815: Repl dump should not propagate the checkpoint and repl source 
properties (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/787e6f99
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/787e6f99
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/787e6f99

Branch: refs/heads/master
Commit: 787e6f99443f6cba4685246ca08f66ed35a7c46c
Parents: 0a961aa
Author: Sankar Hariappan <[email protected]>
Authored: Mon Jun 11 10:28:06 2018 +0530
Committer: Sankar Hariappan <[email protected]>
Committed: Mon Jun 11 10:28:06 2018 +0530

----------------------------------------------------------------------
 ...TestReplicationScenariosAcrossInstances.java | 135 ++++++++++++++++++-
 .../hadoop/hive/ql/parse/WarehouseInstance.java |   8 ++
 .../apache/hadoop/hive/ql/parse/EximUtil.java   |   8 +-
 .../parse/repl/dump/io/PartitionSerializer.java |   8 ++
 .../ql/parse/repl/dump/io/TableSerializer.java  |  14 +-
 .../repl/load/message/AlterDatabaseHandler.java |   6 +-
 6 files changed, 172 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/787e6f99/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 182a772..35437b1 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -468,7 +468,7 @@ public class TestReplicationScenariosAcrossInstances {
     String randomOne = RandomStringUtils.random(10, true, false);
     String randomTwo = RandomStringUtils.random(10, true, false);
     String dbOne = primaryDbName + randomOne;
-    primary.run("alter database default set dbproperties ('repl.source.for' = 
'1, 2, 3')");
+    primary.run("alter database default set dbproperties ('" + 
SOURCE_OF_REPLICATION + "' = '1, 2, 3')");
     WarehouseInstance.Tuple bootstrapTuple = primary
         .run("use " + primaryDbName)
         .run("create table t1 (i int, j int)")
@@ -777,6 +777,14 @@ public class TestReplicationScenariosAcrossInstances {
     assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir));
   }
 
+  private void verifyIfCkptPropMissing(Map<String, String> props) {
+    assertFalse(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY));
+  }
+
+  private void verifyIfSrcOfReplPropMissing(Map<String, String> props) {
+    assertFalse(props.containsKey(SOURCE_OF_REPLICATION));
+  }
+
   @Test
   public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable {
     WarehouseInstance.Tuple tuple = primary
@@ -811,4 +819,129 @@ public class TestReplicationScenariosAcrossInstances {
     Partition uk = replica.getPartition(replicatedDbName, "t2", 
Collections.singletonList("uk"));
     verifyIfCkptSet(uk.getParameters(), tuple.dumpLocation);
   }
+
+  @Test
+  public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws 
Throwable {
+    WarehouseInstance.Tuple tuplePrimary = primary
+            .run("use " + primaryDbName)
+            .run("create table t1 (place string) partitioned by (country 
string) "
+                    + " tblproperties('custom.property'='custom.value')")
+            .run("insert into table t1 partition(country='india') values 
('bangalore')")
+            .dump(primaryDbName, null);
+
+    // Bootstrap Repl A -> B
+    WarehouseInstance.Tuple tupleReplica = replica.load(replicatedDbName, 
tuplePrimary.dumpLocation)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuplePrimary.lastReplicationId)
+            .run("show tblproperties t1('custom.property')")
+            .verifyResults(new String[] { "custom.value\t " })
+            .dumpFailure(replicatedDbName, null)
+            .run("alter database " + replicatedDbName
+                    + " set dbproperties ('" + SOURCE_OF_REPLICATION + "' = 
'1, 2, 3')")
+            .dump(replicatedDbName, null);
+
+    // Bootstrap Repl B -> C
+    String replDbFromReplica = replicatedDbName + "_dupe";
+    replica.load(replDbFromReplica, tupleReplica.dumpLocation)
+            .run("use " + replDbFromReplica)
+            .run("repl status " + replDbFromReplica)
+            .verifyResult(tupleReplica.lastReplicationId)
+            .run("show tables")
+            .verifyResults(new String[] { "t1" })
+            .run("select country from t1")
+            .verifyResults(Arrays.asList("india"))
+            .run("show tblproperties t1('custom.property')")
+            .verifyResults(new String[] { "custom.value\t " });
+
+    // Check if DB/table/partition in C doesn't have repl.source.for props. 
Also ensure, ckpt property
+    // is set to bootstrap dump location used in C.
+    Database db = replica.getDatabase(replDbFromReplica);
+    verifyIfSrcOfReplPropMissing(db.getParameters());
+    verifyIfCkptSet(db.getParameters(), tupleReplica.dumpLocation);
+    Table t1 = replica.getTable(replDbFromReplica, "t1");
+    verifyIfCkptSet(t1.getParameters(), tupleReplica.dumpLocation);
+    Partition india = replica.getPartition(replDbFromReplica, "t1", 
Collections.singletonList("india"));
+    verifyIfCkptSet(india.getParameters(), tupleReplica.dumpLocation);
+
+    // Perform alters in A for incremental replication
+    WarehouseInstance.Tuple tuplePrimaryInc = primary.run("use " + 
primaryDbName)
+            .run("alter database " + primaryDbName + " set 
dbproperties('dummy_key'='dummy_val')")
+            .run("alter table t1 set tblproperties('dummy_key'='dummy_val')")
+            .run("alter table t1 partition(country='india') set fileformat 
orc")
+            .dump(primaryDbName, tuplePrimary.lastReplicationId);
+
+    // Incremental Repl A -> B with alters on db/table/partition
+    WarehouseInstance.Tuple tupleReplicaInc = replica.load(replicatedDbName, 
tuplePrimaryInc.dumpLocation)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuplePrimaryInc.lastReplicationId)
+            .dump(replicatedDbName, tupleReplica.lastReplicationId);
+
+    // Check if DB in B have ckpt property is set to bootstrap dump location 
used in B and missing for table/partition.
+    db = replica.getDatabase(replicatedDbName);
+    verifyIfCkptSet(db.getParameters(), tuplePrimary.dumpLocation);
+    t1 = replica.getTable(replicatedDbName, "t1");
+    verifyIfCkptPropMissing(t1.getParameters());
+    india = replica.getPartition(replicatedDbName, "t1", 
Collections.singletonList("india"));
+    verifyIfCkptPropMissing(india.getParameters());
+
+    // Incremental Repl B -> C with alters on db/table/partition
+    replica.load(replDbFromReplica, tupleReplicaInc.dumpLocation)
+            .run("use " + replDbFromReplica)
+            .run("repl status " + replDbFromReplica)
+            .verifyResult(tupleReplicaInc.lastReplicationId)
+            .run("show tblproperties t1('custom.property')")
+            .verifyResults(new String[] { "custom.value\t " });
+
+    // Check if DB/table/partition in C doesn't have repl.source.for props. 
Also ensure, ckpt property
+    // in DB is set to bootstrap dump location used in C but for 
table/partition, it is missing.
+    db = replica.getDatabase(replDbFromReplica);
+    verifyIfCkptSet(db.getParameters(), tupleReplica.dumpLocation);
+    verifyIfSrcOfReplPropMissing(db.getParameters());
+    t1 = replica.getTable(replDbFromReplica, "t1");
+    verifyIfCkptPropMissing(t1.getParameters());
+    india = replica.getPartition(replDbFromReplica, "t1", 
Collections.singletonList("india"));
+    verifyIfCkptPropMissing(india.getParameters());
+
+    replica.run("drop database if exists " + replDbFromReplica + " cascade");
+  }
+
+  @Test
+  public void testIfCkptPropIgnoredByExport() throws Throwable {
+    WarehouseInstance.Tuple tuplePrimary = primary
+            .run("use " + primaryDbName)
+            .run("create table t1 (place string) partitioned by (country 
string)")
+            .run("insert into table t1 partition(country='india') values 
('bangalore')")
+            .dump(primaryDbName, null);
+
+    // Bootstrap Repl A -> B and then export table t1
+    String path = "hdfs:///tmp/" + replicatedDbName + "/";
+    String exportPath = "'" + path + "1/'";
+    replica.load(replicatedDbName, tuplePrimary.dumpLocation)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuplePrimary.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("export table t1 to " + exportPath);
+
+    // Check if ckpt property set in table/partition in B after bootstrap load.
+    Table t1 = replica.getTable(replicatedDbName, "t1");
+    verifyIfCkptSet(t1.getParameters(), tuplePrimary.dumpLocation);
+    Partition india = replica.getPartition(replicatedDbName, "t1", 
Collections.singletonList("india"));
+    verifyIfCkptSet(india.getParameters(), tuplePrimary.dumpLocation);
+
+    // Import table t1 to C
+    String importDbFromReplica = replicatedDbName + "_dupe";
+    replica.run("create database " + importDbFromReplica)
+            .run("use " + importDbFromReplica)
+            .run("import table t1 from " + exportPath)
+            .run("select country from t1")
+            .verifyResults(Arrays.asList("india"));
+
+    // Check if table/partition in C doesn't have ckpt property
+    t1 = replica.getTable(importDbFromReplica, "t1");
+    verifyIfCkptPropMissing(t1.getParameters());
+    india = replica.getPartition(importDbFromReplica, "t1", 
Collections.singletonList("india"));
+    verifyIfCkptPropMissing(india.getParameters());
+
+    replica.run("drop database if exists " + importDbFromReplica + " cascade");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/787e6f99/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 17fd799..79f145c 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -217,6 +217,14 @@ public class WarehouseInstance implements Closeable {
     return dump(dbName, lastReplicationId, Collections.emptyList());
   }
 
+  WarehouseInstance dumpFailure(String dbName, String lastReplicationId) 
throws Throwable {
+    String dumpCommand =
+            "REPL DUMP " + dbName + (lastReplicationId == null ? "" : " FROM " 
+ lastReplicationId);
+    advanceDumpDir();
+    runFailure(dumpCommand);
+    return this;
+  }
+
   WarehouseInstance load(String replicatedDbName, String dumpLocation) throws 
Throwable {
     run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + 
"'");
     printOutput();

http://git-wip-us.apache.org/repos/asf/hive/blob/787e6f99/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 0d2fafb..0a5ecf9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -24,11 +24,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -255,12 +257,14 @@ public class EximUtil {
     // If we later make this work for non-repl cases, analysis of this logic 
might become necessary. Also, this is using
     // Replv2 semantics, i.e. with listFiles laziness (no copy at export time)
 
-    // Remove all the entries from the parameters which are added for 
bootstrap dump progress
+    // Remove all the entries from the parameters which are added by repl 
tasks internally.
     Map<String, String> parameters = dbObj.getParameters();
     if (parameters != null) {
       Map<String, String> tmpParameters = new HashMap<>(parameters);
       tmpParameters.entrySet()
-                .removeIf(e -> 
e.getKey().startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX));
+                .removeIf(e -> 
e.getKey().startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX)
+                            || e.getKey().equals(ReplUtils.REPL_CHECKPOINT_KEY)
+                            || 
e.getKey().equals(ReplChangeManager.SOURCE_OF_REPLICATION));
       dbObj.setParameters(tmpParameters);
     }
     try (JsonWriter jsonWriter = new JsonWriter(fs, metadataPath)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/787e6f99/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
index ce83523..9fdf742 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.io;
 
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.thrift.TException;
@@ -41,6 +42,13 @@ public class PartitionSerializer implements 
JsonWriter.Serializer {
       throws SemanticException, IOException {
     TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
     try {
+      // Remove all the entries from the parameters which are added by repl 
tasks internally.
+      Map<String, String> parameters = partition.getParameters();
+      if (parameters != null) {
+        parameters.entrySet()
+                .removeIf(e -> 
e.getKey().equals(ReplUtils.REPL_CHECKPOINT_KEY));
+      }
+
       if (additionalPropertiesProvider.isInReplicationScope()) {
         // Current replication state must be set on the Partition object only 
for bootstrap dump.
         // Event replication State will be null in case of bootstrap dump.

http://git-wip-us.apache.org/repos/asf/hive/blob/787e6f99/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index 143808b..70f4fed 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -53,7 +54,7 @@ public class TableSerializer implements JsonWriter.Serializer 
{
     }
 
     Table tTable = tableHandle.getTTable();
-    tTable = addPropertiesToTable(tTable, additionalPropertiesProvider);
+    tTable = updatePropertiesInTable(tTable, additionalPropertiesProvider);
     try {
       TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
       writer.jsonGenerator
@@ -65,7 +66,14 @@ public class TableSerializer implements 
JsonWriter.Serializer {
     }
   }
 
-  private Table addPropertiesToTable(Table table, ReplicationSpec 
additionalPropertiesProvider) {
+  private Table updatePropertiesInTable(Table table, ReplicationSpec 
additionalPropertiesProvider) {
+    // Remove all the entries from the parameters which are added by repl 
tasks internally.
+    Map<String, String> parameters = table.getParameters();
+    if (parameters != null) {
+      parameters.entrySet()
+              .removeIf(e -> e.getKey().equals(ReplUtils.REPL_CHECKPOINT_KEY));
+    }
+
     if (additionalPropertiesProvider.isInReplicationScope()) {
       // Current replication state must be set on the Table object only for 
bootstrap dump.
       // Event replication State will be null in case of bootstrap dump.
@@ -87,7 +95,7 @@ public class TableSerializer implements JsonWriter.Serializer 
{
       // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE;
       // write(out, ",\""+ scopeKey.toString() +"\":\"" + 
replicationSpec.get(scopeKey) + "\"");
       // TODO: if we want to be explicit about this dump not being a 
replication dump, we can
-      // uncomment this else section, but currently unnneeded. Will require a 
lot of golden file
+      // uncomment this else section, but currently unneeded. Will require a 
lot of golden file
       // regen if we do so.
     }
     return table;

http://git-wip-us.apache.org/repos/asf/hive/blob/787e6f99/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
index 00ce977..b59cdf2 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
@@ -17,10 +17,12 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
@@ -66,7 +68,9 @@ public class AlterDatabaseHandler extends 
AbstractMessageHandler {
           String key = entry.getKey();
           // Ignore the keys which are local to source warehouse
           if (key.startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX)
-                  || key.equals(ReplicationSpec.KEY.CURR_STATE_ID.toString())) 
{
+                  || key.equals(ReplicationSpec.KEY.CURR_STATE_ID.toString())
+                  || key.equals(ReplUtils.REPL_CHECKPOINT_KEY)
+                  || key.equals(ReplChangeManager.SOURCE_OF_REPLICATION)) {
             continue;
           }
           newDbProps.put(key, entry.getValue());

Reply via email to