Repository: hive
Updated Branches:
  refs/heads/branch-3 c085aaa58 -> 72ebbff0e


HIVE-19815: Repl dump should not propagate the checkpoint and repl source 
properties (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/72ebbff0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/72ebbff0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/72ebbff0

Branch: refs/heads/branch-3
Commit: 72ebbff0e90aadfe21705f11a34d0714ba610b10
Parents: c085aaa
Author: Sankar Hariappan <[email protected]>
Authored: Tue Jun 12 09:31:04 2018 +0530
Committer: Sankar Hariappan <[email protected]>
Committed: Tue Jun 12 09:31:04 2018 +0530

----------------------------------------------------------------------
 ...TestReplicationScenariosAcrossInstances.java | 244 ++++++++++++++-----
 .../hadoop/hive/ql/parse/WarehouseInstance.java |   8 +
 .../apache/hadoop/hive/ql/parse/EximUtil.java   |   8 +-
 .../parse/repl/dump/io/PartitionSerializer.java |   8 +
 .../ql/parse/repl/dump/io/TableSerializer.java  |  14 +-
 .../repl/load/message/AlterDatabaseHandler.java |   6 +-
 6 files changed, 226 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/72ebbff0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 6167459..35437b1 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -468,7 +468,7 @@ public class TestReplicationScenariosAcrossInstances {
     String randomOne = RandomStringUtils.random(10, true, false);
     String randomTwo = RandomStringUtils.random(10, true, false);
     String dbOne = primaryDbName + randomOne;
-    primary.run("alter database default set dbproperties ('repl.source.for' = 
'1, 2, 3')");
+    primary.run("alter database default set dbproperties ('" + 
SOURCE_OF_REPLICATION + "' = '1, 2, 3')");
     WarehouseInstance.Tuple bootstrapTuple = primary
         .run("use " + primaryDbName)
         .run("create table t1 (i int, j int)")
@@ -699,55 +699,77 @@ public class TestReplicationScenariosAcrossInstances {
 
     // Bootstrap load in replica
     replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
-            .status(replicatedDbName)
-            .verifyResult(bootstrapTuple.lastReplicationId);
+        .status(replicatedDbName)
+        .verifyResult(bootstrapTuple.lastReplicationId);
 
     // First incremental dump
     WarehouseInstance.Tuple firstIncremental = primary.run("use " + 
primaryDbName)
-            .run("create table table1 (id int) partitioned by (country 
string)")
-            .run("create table table2 (id int)")
-            .run("create table table3 (id int) partitioned by (country 
string)")
-            .run("insert into table1 partition(country='india') values(1)")
-            .run("insert into table2 values(2)")
-            .run("insert into table3 partition(country='india') values(3)")
-            .dump(primaryDbName, bootstrapTuple.lastReplicationId);
+        .run("create table table1 (id int) partitioned by (country string)")
+        .run("create table table2 (id int)")
+        .run("create table table3 (id int) partitioned by (country string)")
+        .run("insert into table1 partition(country='india') values(1)")
+        .run("insert into table2 values(2)")
+        .run("insert into table3 partition(country='india') values(3)")
+        .dump(primaryDbName, bootstrapTuple.lastReplicationId);
 
     // Second incremental dump
     WarehouseInstance.Tuple secondIncremental = primary.run("use " + 
primaryDbName)
-            .run("drop table table1")
-            .run("drop table table2")
-            .run("drop table table3")
-            .run("create table table1 (id int)")
-            .run("insert into table1 values (10)")
-            .run("create table table2 (id int) partitioned by (country 
string)")
-            .run("insert into table2 partition(country='india') values(20)")
-            .run("create table table3 (id int) partitioned by (name string, 
rank int)")
-            .run("insert into table3 partition(name='adam', rank=100) 
values(30)")
-            .dump(primaryDbName, firstIncremental.lastReplicationId);
+        .run("drop table table1")
+        .run("drop table table2")
+        .run("drop table table3")
+        .run("create table table1 (id int)")
+        .run("insert into table1 values (10)")
+        .run("create table table2 (id int) partitioned by (country string)")
+        .run("insert into table2 partition(country='india') values(20)")
+        .run("create table table3 (id int) partitioned by (name string, rank 
int)")
+        .run("insert into table3 partition(name='adam', rank=100) values(30)")
+        .dump(primaryDbName, firstIncremental.lastReplicationId);
 
     // First incremental load
     replica.load(replicatedDbName, firstIncremental.dumpLocation)
-            .status(replicatedDbName)
-            .verifyResult(firstIncremental.lastReplicationId)
-            .run("use " + replicatedDbName)
-            .run("select id from table1")
-            .verifyResults(new String[] {"1"})
-            .run("select * from table2")
-            .verifyResults(new String[] {"2"})
-            .run("select id from table3")
-            .verifyResults(new String[] {"3"});
+        .status(replicatedDbName)
+        .verifyResult(firstIncremental.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("select id from table1")
+        .verifyResults(new String[] { "1" })
+        .run("select * from table2")
+        .verifyResults(new String[] { "2" })
+        .run("select id from table3")
+        .verifyResults(new String[] { "3" });
 
     // Second incremental load
     replica.load(replicatedDbName, secondIncremental.dumpLocation)
-            .status(replicatedDbName)
-            .verifyResult(secondIncremental.lastReplicationId)
-            .run("use " + replicatedDbName)
-            .run("select * from table1")
-            .verifyResults(new String[] {"10"})
-            .run("select id from table2")
-            .verifyResults(new String[] {"20"})
-            .run("select id from table3")
-            .verifyResults(new String[] {"30"});
+        .status(replicatedDbName)
+        .verifyResult(secondIncremental.lastReplicationId)
+        .run("use " + replicatedDbName)
+        .run("select * from table1")
+        .verifyResults(new String[] { "10" })
+        .run("select id from table2")
+        .verifyResults(new String[] { "20" })
+        .run("select id from table3")
+        .verifyResults(new String[] {"30"});
+  }
+
+  @Test
+  public void shouldNotCreateDirectoryForNonNativeTableInDumpDirectory() 
throws Throwable {
+    String createTableQuery =
+        "CREATE TABLE custom_serdes( serde_id bigint COMMENT 'from 
deserializer', name string "
+            + "COMMENT 'from deserializer', slib string COMMENT 'from 
deserializer') "
+            + "ROW FORMAT SERDE 'org.apache.hive.storage.jdbc.JdbcSerDe' "
+            + "STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' "
+            + "WITH SERDEPROPERTIES ('serialization.format'='1') "
+            + "TBLPROPERTIES ( "
+            + "'hive.sql.database.type'='METASTORE', "
+            + "'hive.sql.query'='SELECT \"SERDE_ID\", \"NAME\", \"SLIB\" FROM 
\"SERDES\"')";
+
+    WarehouseInstance.Tuple bootstrapTuple = primary
+        .run("use " + primaryDbName)
+        .run(createTableQuery).dump(primaryDbName, null);
+    Path cSerdesTableDumpLocation = new Path(
+        new Path(bootstrapTuple.dumpLocation, primaryDbName),
+        "custom_serdes");
+    FileSystem fs = cSerdesTableDumpLocation.getFileSystem(primary.hiveConf);
+    assertFalse(fs.exists(cSerdesTableDumpLocation));
   }
 
   private void verifyIfCkptSet(Map<String, String> props, String dumpDir) {
@@ -755,6 +777,14 @@ public class TestReplicationScenariosAcrossInstances {
     assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir));
   }
 
+  private void verifyIfCkptPropMissing(Map<String, String> props) {
+    assertFalse(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY));
+  }
+
+  private void verifyIfSrcOfReplPropMissing(Map<String, String> props) {
+    assertFalse(props.containsKey(SOURCE_OF_REPLICATION));
+  }
+
   @Test
   public void testIfCkptSetForObjectsByBootstrapReplLoad() throws Throwable {
     WarehouseInstance.Tuple tuple = primary
@@ -791,25 +821,127 @@ public class TestReplicationScenariosAcrossInstances {
   }
 
   @Test
-  public void shouldNotCreateDirectoryForNonNativeTableInDumpDirectory() 
throws Throwable {
-    String createTableQuery =
-        "CREATE TABLE custom_serdes( serde_id bigint COMMENT 'from 
deserializer', name string "
-            + "COMMENT 'from deserializer', slib string COMMENT 'from 
deserializer') "
-            + "ROW FORMAT SERDE 'org.apache.hive.storage.jdbc.JdbcSerDe' "
-            + "STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' "
-            + "WITH SERDEPROPERTIES ('serialization.format'='1') "
-            + "TBLPROPERTIES ( "
-            + "'hive.sql.database.type'='METASTORE', "
-            + "'hive.sql.query'='SELECT \"SERDE_ID\", \"NAME\", \"SLIB\" FROM 
\"SERDES\"')";
+  public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws 
Throwable {
+    WarehouseInstance.Tuple tuplePrimary = primary
+            .run("use " + primaryDbName)
+            .run("create table t1 (place string) partitioned by (country 
string) "
+                    + " tblproperties('custom.property'='custom.value')")
+            .run("insert into table t1 partition(country='india') values 
('bangalore')")
+            .dump(primaryDbName, null);
 
-    WarehouseInstance.Tuple bootstrapTuple = primary
-        .run("use " + primaryDbName)
-        .run(createTableQuery).dump(primaryDbName, null);
-    Path cSerdesTableDumpLocation = new Path(
-        new Path(bootstrapTuple.dumpLocation, primaryDbName),
-        "custom_serdes");
-    FileSystem fs = cSerdesTableDumpLocation.getFileSystem(primary.hiveConf);
-    assertFalse(fs.exists(cSerdesTableDumpLocation));
+    // Bootstrap Repl A -> B
+    WarehouseInstance.Tuple tupleReplica = replica.load(replicatedDbName, 
tuplePrimary.dumpLocation)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuplePrimary.lastReplicationId)
+            .run("show tblproperties t1('custom.property')")
+            .verifyResults(new String[] { "custom.value\t " })
+            .dumpFailure(replicatedDbName, null)
+            .run("alter database " + replicatedDbName
+                    + " set dbproperties ('" + SOURCE_OF_REPLICATION + "' = 
'1, 2, 3')")
+            .dump(replicatedDbName, null);
+
+    // Bootstrap Repl B -> C
+    String replDbFromReplica = replicatedDbName + "_dupe";
+    replica.load(replDbFromReplica, tupleReplica.dumpLocation)
+            .run("use " + replDbFromReplica)
+            .run("repl status " + replDbFromReplica)
+            .verifyResult(tupleReplica.lastReplicationId)
+            .run("show tables")
+            .verifyResults(new String[] { "t1" })
+            .run("select country from t1")
+            .verifyResults(Arrays.asList("india"))
+            .run("show tblproperties t1('custom.property')")
+            .verifyResults(new String[] { "custom.value\t " });
+
+    // Check if DB/table/partition in C doesn't have repl.source.for props. 
Also ensure, ckpt property
+    // is set to bootstrap dump location used in C.
+    Database db = replica.getDatabase(replDbFromReplica);
+    verifyIfSrcOfReplPropMissing(db.getParameters());
+    verifyIfCkptSet(db.getParameters(), tupleReplica.dumpLocation);
+    Table t1 = replica.getTable(replDbFromReplica, "t1");
+    verifyIfCkptSet(t1.getParameters(), tupleReplica.dumpLocation);
+    Partition india = replica.getPartition(replDbFromReplica, "t1", 
Collections.singletonList("india"));
+    verifyIfCkptSet(india.getParameters(), tupleReplica.dumpLocation);
+
+    // Perform alters in A for incremental replication
+    WarehouseInstance.Tuple tuplePrimaryInc = primary.run("use " + 
primaryDbName)
+            .run("alter database " + primaryDbName + " set 
dbproperties('dummy_key'='dummy_val')")
+            .run("alter table t1 set tblproperties('dummy_key'='dummy_val')")
+            .run("alter table t1 partition(country='india') set fileformat 
orc")
+            .dump(primaryDbName, tuplePrimary.lastReplicationId);
+
+    // Incremental Repl A -> B with alters on db/table/partition
+    WarehouseInstance.Tuple tupleReplicaInc = replica.load(replicatedDbName, 
tuplePrimaryInc.dumpLocation)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuplePrimaryInc.lastReplicationId)
+            .dump(replicatedDbName, tupleReplica.lastReplicationId);
+
+    // Check if DB in B have ckpt property is set to bootstrap dump location 
used in B and missing for table/partition.
+    db = replica.getDatabase(replicatedDbName);
+    verifyIfCkptSet(db.getParameters(), tuplePrimary.dumpLocation);
+    t1 = replica.getTable(replicatedDbName, "t1");
+    verifyIfCkptPropMissing(t1.getParameters());
+    india = replica.getPartition(replicatedDbName, "t1", 
Collections.singletonList("india"));
+    verifyIfCkptPropMissing(india.getParameters());
+
+    // Incremental Repl B -> C with alters on db/table/partition
+    replica.load(replDbFromReplica, tupleReplicaInc.dumpLocation)
+            .run("use " + replDbFromReplica)
+            .run("repl status " + replDbFromReplica)
+            .verifyResult(tupleReplicaInc.lastReplicationId)
+            .run("show tblproperties t1('custom.property')")
+            .verifyResults(new String[] { "custom.value\t " });
+
+    // Check if DB/table/partition in C doesn't have repl.source.for props. 
Also ensure, ckpt property
+    // in DB is set to bootstrap dump location used in C but for 
table/partition, it is missing.
+    db = replica.getDatabase(replDbFromReplica);
+    verifyIfCkptSet(db.getParameters(), tupleReplica.dumpLocation);
+    verifyIfSrcOfReplPropMissing(db.getParameters());
+    t1 = replica.getTable(replDbFromReplica, "t1");
+    verifyIfCkptPropMissing(t1.getParameters());
+    india = replica.getPartition(replDbFromReplica, "t1", 
Collections.singletonList("india"));
+    verifyIfCkptPropMissing(india.getParameters());
+
+    replica.run("drop database if exists " + replDbFromReplica + " cascade");
   }
 
+  @Test
+  public void testIfCkptPropIgnoredByExport() throws Throwable {
+    WarehouseInstance.Tuple tuplePrimary = primary
+            .run("use " + primaryDbName)
+            .run("create table t1 (place string) partitioned by (country 
string)")
+            .run("insert into table t1 partition(country='india') values 
('bangalore')")
+            .dump(primaryDbName, null);
+
+    // Bootstrap Repl A -> B and then export table t1
+    String path = "hdfs:///tmp/" + replicatedDbName + "/";
+    String exportPath = "'" + path + "1/'";
+    replica.load(replicatedDbName, tuplePrimary.dumpLocation)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuplePrimary.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("export table t1 to " + exportPath);
+
+    // Check if ckpt property set in table/partition in B after bootstrap load.
+    Table t1 = replica.getTable(replicatedDbName, "t1");
+    verifyIfCkptSet(t1.getParameters(), tuplePrimary.dumpLocation);
+    Partition india = replica.getPartition(replicatedDbName, "t1", 
Collections.singletonList("india"));
+    verifyIfCkptSet(india.getParameters(), tuplePrimary.dumpLocation);
+
+    // Import table t1 to C
+    String importDbFromReplica = replicatedDbName + "_dupe";
+    replica.run("create database " + importDbFromReplica)
+            .run("use " + importDbFromReplica)
+            .run("import table t1 from " + exportPath)
+            .run("select country from t1")
+            .verifyResults(Arrays.asList("india"));
+
+    // Check if table/partition in C doesn't have ckpt property
+    t1 = replica.getTable(importDbFromReplica, "t1");
+    verifyIfCkptPropMissing(t1.getParameters());
+    india = replica.getPartition(importDbFromReplica, "t1", 
Collections.singletonList("india"));
+    verifyIfCkptPropMissing(india.getParameters());
+
+    replica.run("drop database if exists " + importDbFromReplica + " cascade");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/72ebbff0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 29bf160..b38965c 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -217,6 +217,14 @@ public class WarehouseInstance implements Closeable {
     return dump(dbName, lastReplicationId, Collections.emptyList());
   }
 
+  WarehouseInstance dumpFailure(String dbName, String lastReplicationId) 
throws Throwable {
+    String dumpCommand =
+            "REPL DUMP " + dbName + (lastReplicationId == null ? "" : " FROM " 
+ lastReplicationId);
+    advanceDumpDir();
+    runFailure(dumpCommand);
+    return this;
+  }
+
   WarehouseInstance load(String replicatedDbName, String dumpLocation) throws 
Throwable {
     run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + 
"'");
     printOutput();

http://git-wip-us.apache.org/repos/asf/hive/blob/72ebbff0/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 0d2fafb..0a5ecf9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -24,11 +24,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -255,12 +257,14 @@ public class EximUtil {
     // If we later make this work for non-repl cases, analysis of this logic 
might become necessary. Also, this is using
     // Replv2 semantics, i.e. with listFiles laziness (no copy at export time)
 
-    // Remove all the entries from the parameters which are added for 
bootstrap dump progress
+    // Remove all the entries from the parameters which are added by repl 
tasks internally.
     Map<String, String> parameters = dbObj.getParameters();
     if (parameters != null) {
       Map<String, String> tmpParameters = new HashMap<>(parameters);
       tmpParameters.entrySet()
-                .removeIf(e -> 
e.getKey().startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX));
+                .removeIf(e -> 
e.getKey().startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX)
+                            || e.getKey().equals(ReplUtils.REPL_CHECKPOINT_KEY)
+                            || 
e.getKey().equals(ReplChangeManager.SOURCE_OF_REPLICATION));
       dbObj.setParameters(tmpParameters);
     }
     try (JsonWriter jsonWriter = new JsonWriter(fs, metadataPath)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/72ebbff0/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
index ce83523..9fdf742 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.io;
 
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.thrift.TException;
@@ -41,6 +42,13 @@ public class PartitionSerializer implements 
JsonWriter.Serializer {
       throws SemanticException, IOException {
     TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
     try {
+      // Remove all the entries from the parameters which are added by repl 
tasks internally.
+      Map<String, String> parameters = partition.getParameters();
+      if (parameters != null) {
+        parameters.entrySet()
+                .removeIf(e -> 
e.getKey().equals(ReplUtils.REPL_CHECKPOINT_KEY));
+      }
+
       if (additionalPropertiesProvider.isInReplicationScope()) {
         // Current replication state must be set on the Partition object only 
for bootstrap dump.
         // Event replication State will be null in case of bootstrap dump.

http://git-wip-us.apache.org/repos/asf/hive/blob/72ebbff0/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index 143808b..70f4fed 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -53,7 +54,7 @@ public class TableSerializer implements JsonWriter.Serializer 
{
     }
 
     Table tTable = tableHandle.getTTable();
-    tTable = addPropertiesToTable(tTable, additionalPropertiesProvider);
+    tTable = updatePropertiesInTable(tTable, additionalPropertiesProvider);
     try {
       TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
       writer.jsonGenerator
@@ -65,7 +66,14 @@ public class TableSerializer implements 
JsonWriter.Serializer {
     }
   }
 
-  private Table addPropertiesToTable(Table table, ReplicationSpec 
additionalPropertiesProvider) {
+  private Table updatePropertiesInTable(Table table, ReplicationSpec 
additionalPropertiesProvider) {
+    // Remove all the entries from the parameters which are added by repl 
tasks internally.
+    Map<String, String> parameters = table.getParameters();
+    if (parameters != null) {
+      parameters.entrySet()
+              .removeIf(e -> e.getKey().equals(ReplUtils.REPL_CHECKPOINT_KEY));
+    }
+
     if (additionalPropertiesProvider.isInReplicationScope()) {
       // Current replication state must be set on the Table object only for 
bootstrap dump.
       // Event replication State will be null in case of bootstrap dump.
@@ -87,7 +95,7 @@ public class TableSerializer implements JsonWriter.Serializer 
{
       // ReplicationSpec.KEY scopeKey = ReplicationSpec.KEY.REPL_SCOPE;
       // write(out, ",\""+ scopeKey.toString() +"\":\"" + 
replicationSpec.get(scopeKey) + "\"");
       // TODO: if we want to be explicit about this dump not being a 
replication dump, we can
-      // uncomment this else section, but currently unnneeded. Will require a 
lot of golden file
+      // uncomment this else section, but currently unneeded. Will require a 
lot of golden file
       // regen if we do so.
     }
     return table;

http://git-wip-us.apache.org/repos/asf/hive/blob/72ebbff0/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
index 00ce977..b59cdf2 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
@@ -17,10 +17,12 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
@@ -66,7 +68,9 @@ public class AlterDatabaseHandler extends 
AbstractMessageHandler {
           String key = entry.getKey();
           // Ignore the keys which are local to source warehouse
           if (key.startsWith(Utils.BOOTSTRAP_DUMP_STATE_KEY_PREFIX)
-                  || key.equals(ReplicationSpec.KEY.CURR_STATE_ID.toString())) 
{
+                  || key.equals(ReplicationSpec.KEY.CURR_STATE_ID.toString())
+                  || key.equals(ReplUtils.REPL_CHECKPOINT_KEY)
+                  || key.equals(ReplChangeManager.SOURCE_OF_REPLICATION)) {
             continue;
           }
           newDbProps.put(key, entry.getValue());

Reply via email to