This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 7416fac  HIVE-21763: Incremental replication to allow changing 
include/exclude tables list in replication policy (Sankar Hariappan, reviewed 
by Mahesh Kumar Behera)
7416fac is described below

commit 7416fac24d5327db6e8de91317d9ad8a4ce69017
Author: Sankar Hariappan <[email protected]>
AuthorDate: Wed Jun 19 08:46:25 2019 +0530

    HIVE-21763: Incremental replication to allow changing include/exclude 
tables list in replication policy (Sankar Hariappan, reviewed by Mahesh Kumar 
Behera)
    
    Signed-off-by: Sankar Hariappan <[email protected]>
---
 .../hive/ql/parse/TestReplicationScenarios.java    |   2 +-
 .../parse/TestTableLevelReplicationScenarios.java  | 240 ++++++++++++++++++++-
 .../hadoop/hive/ql/parse/WarehouseInstance.java    |  14 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     | 101 ++++++---
 .../hadoop/hive/ql/exec/repl/ReplDumpWork.java     |   4 +-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  36 ++++
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java     |  15 +-
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |   5 +
 .../org/apache/hadoop/hive/ql/parse/HiveParser.g   |  22 +-
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java |  49 ++++-
 .../hive/ql/parse/repl/dump/TableExport.java       |   2 +-
 .../hadoop/hive/ql/parse/repl/dump/Utils.java      |  41 ++--
 .../events/AbstractConstraintEventHandler.java     |   1 +
 .../repl/dump/events/AddPartitionHandler.java      |   3 +-
 .../repl/dump/events/AllocWriteIdHandler.java      |   9 +
 .../repl/dump/events/AlterPartitionHandler.java    |   3 +-
 .../parse/repl/dump/events/AlterTableHandler.java  |   3 +-
 .../parse/repl/dump/events/CommitTxnHandler.java   |   8 +-
 .../parse/repl/dump/events/CreateTableHandler.java |   3 +-
 .../ql/parse/repl/dump/events/EventHandler.java    |   5 +-
 .../ql/parse/repl/dump/events/InsertHandler.java   |   3 +-
 .../repl/dump/events/UpdatePartColStatHandler.java |   4 +-
 .../dump/events/UpdateTableColStatHandler.java     |   3 +-
 .../ql/parse/repl/dump/io/TableSerializer.java     |   2 +-
 .../hive/ql/parse/repl/load/DumpMetaData.java      | 116 ++++++++--
 .../hadoop/hive/ql/exec/repl/TestReplDumpTask.java |   2 +-
 .../apache/hadoop/hive/common/repl/ReplScope.java  |  26 ++-
 27 files changed, 624 insertions(+), 98 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 1f41d46..0bc7bb3 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -395,7 +395,7 @@ public class TestReplicationScenarios {
     HiveConf confTemp = new HiveConf();
     confTemp.set("hive.repl.enable.move.optimization", "true");
     ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, 
replicadb,
-            null, isIncrementalDump, Long.valueOf(tuple.lastReplId),
+            null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId),
         Collections.emptyList());
     Task replLoadTask = TaskFactory.get(replLoadWork, confTemp);
     replLoadTask.initialize(null, null, new 
DriverContext(driver.getContext()), null);
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index 7c1d010..f67fc81 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -37,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME;
+import static 
org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME;
 
 /**
  * Tests Table level replication scenarios.
@@ -115,6 +117,14 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
                                   List<String> dumpWithClause,
                                   List<String> loadWithClause,
                                   String[] expectedTables) throws Throwable {
+    return replicateAndVerify(replPolicy, null, lastReplId, dumpWithClause, 
loadWithClause, null, expectedTables);
+  }
+
+  private String replicateAndVerify(String replPolicy, String oldReplPolicy, 
String lastReplId,
+                                    List<String> dumpWithClause,
+                                    List<String> loadWithClause,
+                                    String[] bootstrappedTables,
+                                    String[] expectedTables) throws Throwable {
     if (dumpWithClause == null) {
       dumpWithClause = new ArrayList<>();
     }
@@ -126,8 +136,11 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
     if (lastReplId == null) {
       replica.run("drop database if exists " + replicatedDbName + " cascade");
     }
-    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
-            .dump(replPolicy, lastReplId, dumpWithClause);
+    WarehouseInstance.Tuple tuple = primary.dump(replPolicy, oldReplPolicy, 
lastReplId, dumpWithClause);
+
+    if (oldReplPolicy != null) {
+      verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, 
bootstrappedTables);
+    }
 
     replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
             .run("use " + replicatedDbName)
@@ -136,6 +149,31 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
     return tuple.lastReplicationId;
   }
 
+  private void verifyBootstrapDirInIncrementalDump(String dumpLocation, 
String[] bootstrappedTables)
+          throws Throwable {
+    // _bootstrap directory should be created as bootstrap enabled on external 
tables.
+    Path dumpPath = new Path(dumpLocation, INC_BOOTSTRAP_ROOT_DIR_NAME);
+
+    // If nothing to be bootstrapped.
+    if (bootstrappedTables.length == 0) {
+      
Assert.assertFalse(primary.miniDFSCluster.getFileSystem().exists(dumpPath));
+      return;
+    }
+
+    Assert.assertTrue(primary.miniDFSCluster.getFileSystem().exists(dumpPath));
+
+    // Check if the DB dump path have any tables other than the ones listed in 
bootstrappedTables.
+    Path dbPath = new Path(dumpPath, primaryDbName);
+    FileStatus[] fileStatuses = 
primary.miniDFSCluster.getFileSystem().listStatus(dbPath);
+    Assert.assertEquals(fileStatuses.length, bootstrappedTables.length);
+
+    // Eg: _bootstrap/<db_name>/t2, _bootstrap/<db_name>/t3 etc
+    for (String tableName : bootstrappedTables) {
+      Path tblPath = new Path(dbPath, tableName);
+      
Assert.assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath));
+    }
+  }
+
   @Test
   public void testBasicBootstrapWithIncludeList() throws Throwable {
     String[] originalNonAcidTables = new String[] {"t1", "t2" };
@@ -197,7 +235,7 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
   }
 
   @Test
-  public void testReplDumpWithIncorrectTablePolicy() throws Throwable {
+  public void testIncorrectTablePolicyInReplDump() throws Throwable {
     String[] originalTables = new String[] {"t1", "t11", "t2", "t3", "t111" };
     createTables(originalTables, CreateTableType.NON_ACID);
 
@@ -224,8 +262,39 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
       Assert.assertTrue(failed);
     }
 
+    // Test incremental replication with invalid replication policies in 
REPLACE clause.
+    String replPolicy = primaryDbName;
+    WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + 
primaryDbName)
+            .dump(primaryDbName, null);
+    replica.load(replicatedDbName, tupleBootstrap.dumpLocation);
+    String lastReplId = tupleBootstrap.lastReplicationId;
+    for (String oldReplPolicy : invalidReplPolicies) {
+      failed = false;
+      try {
+        replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, null, null, 
null, replicatedTables);
+      } catch (Exception ex) {
+        LOG.info("Got exception: {}", ex.getMessage());
+        Assert.assertTrue(ex instanceof ParseException);
+        failed = true;
+      }
+      Assert.assertTrue(failed);
+    }
+
+    // Replace with replication policy having different DB name.
+    String oldReplPolicy = replPolicy;
+    replPolicy = primaryDbName + "_dupe.['t1+'].['t1']";
+    failed = false;
+    try {
+      replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, null, null, 
null, replicatedTables);
+    } catch (Exception ex) {
+      LOG.info("Got exception: {}", ex.getMessage());
+      Assert.assertTrue(ex instanceof SemanticException);
+      failed = true;
+    }
+    Assert.assertTrue(failed);
+
     // Invalid pattern where we didn't enclose table pattern within single or 
double quotes.
-    String replPolicy = primaryDbName + ".[t1].[t2]";
+    replPolicy = primaryDbName + ".[t1].[t2]";
     failed = false;
     try {
       replicateAndVerify(replPolicy, null, null, null, replicatedTables);
@@ -263,7 +332,14 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
     // Replicate and verify if 2 tables are replicated as per policy.
     String replPolicy = primaryDbName.toUpperCase() + ".['.*a1+', 'cc3', 
'B2'].['AA1+', 'b2']";
     String[] replicatedTables = new String[] {"a1", "cc3" };
-    replicateAndVerify(replPolicy, null, null, null, replicatedTables);
+    String lastReplId = replicateAndVerify(replPolicy, null, null, null, 
replicatedTables);
+
+    // Test case insensitive nature in REPLACE clause as well.
+    String oldReplPolicy = replPolicy;
+    replPolicy = primaryDbName + ".['.*a1+', 'cc3', 'B2'].['AA1+']";
+    replicatedTables = new String[] {"a1", "b2", "cc3" };
+    String[] bootstrappedTables = new String[] {"b2" };
+    replicateAndVerify(replPolicy, oldReplPolicy, lastReplId, null, null, 
bootstrappedTables, replicatedTables);
   }
 
   @Test
@@ -335,7 +411,8 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
     );
     String replPolicy = primaryDbName + ".['a[0-9]+', 'b2'].['a1']";
     String[] bootstrapReplicatedTables = new String[] {"b2" };
-    String lastReplId = replicateAndVerify(replPolicy, null, dumpWithClause, 
loadWithClause, bootstrapReplicatedTables);
+    String lastReplId = replicateAndVerify(replPolicy, null,
+            dumpWithClause, loadWithClause, bootstrapReplicatedTables);
 
     // Enable external tables replication and bootstrap in incremental phase.
     String[] incrementalReplicatedTables = new String[] {"a2", "b2" };
@@ -357,4 +434,155 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
             .run("show tables")
             .verifyResults(incrementalReplicatedTables);
   }
+
+  @Test
+  public void testBasicReplaceReplPolicy() throws Throwable {
+    String[] originalNonAcidTables = new String[] {"t1", "t2" };
+    String[] originalFullAcidTables = new String[] {"t3", "t4" };
+    String[] originalMMAcidTables = new String[] {"t5" };
+    createTables(originalNonAcidTables, CreateTableType.NON_ACID);
+    createTables(originalFullAcidTables, CreateTableType.FULL_ACID);
+    createTables(originalMMAcidTables, CreateTableType.MM_ACID);
+
+    // Replicate and verify if only 2 tables are replicated to target.
+    String replPolicy = primaryDbName + ".['t1', 't4']";
+    String oldReplPolicy = null;
+    String[] replicatedTables = new String[] {"t1", "t4" };
+    String lastReplId = replicateAndVerify(replPolicy, null, null, null, 
replicatedTables);
+
+    // Exclude t4 and include t3, t6
+    createTables(new String[] {"t6" }, CreateTableType.MM_ACID);
+    oldReplPolicy = replPolicy;
+    replPolicy = primaryDbName + ".['t1', 't3', 't6']";
+    replicatedTables = new String[] {"t1", "t3", "t6" };
+    String[] bootstrappedTables = new String[] {"t3", "t6" };
+    lastReplId = replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
+            null, null, bootstrappedTables, replicatedTables);
+
+    // Convert to Full Db repl policy. All tables should be included.
+    oldReplPolicy = replPolicy;
+    replPolicy = primaryDbName;
+    replicatedTables = new String[] {"t1", "t2", "t3", "t4", "t5", "t6" };
+    bootstrappedTables = new String[] {"t2", "t4", "t5" };
+    replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
+            null, null, bootstrappedTables, replicatedTables);
+
+    // Convert to regex that excludes t3, t4 and t5.
+    oldReplPolicy = replPolicy;
+    replPolicy = primaryDbName + ".['.*?'].['t[3-5]+']";
+    replicatedTables = new String[] {"t1", "t2", "t6" };
+    bootstrappedTables = new String[]{};
+    replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
+            null, null, bootstrappedTables, replicatedTables);
+  }
+
+  @Test
+  public void testReplacePolicyOnBootstrapAcidTablesIncrementalPhase() throws 
Throwable {
+    String[] originalNonAcidTables = new String[] {"a1", "b1", "c1" };
+    String[] originalFullAcidTables = new String[] {"a2", "b2" };
+    String[] originalMMAcidTables = new String[] {"a3", "a4" };
+    createTables(originalNonAcidTables, CreateTableType.NON_ACID);
+    createTables(originalFullAcidTables, CreateTableType.FULL_ACID);
+    createTables(originalMMAcidTables, CreateTableType.MM_ACID);
+
+    // Replicate and verify if only non-acid tables are replicated to target.
+    List<String> dumpWithoutAcidClause = Collections.singletonList(
+            "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'");
+    String replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['b1']";
+    String[] bootstrapReplicatedTables = new String[] {"a1" };
+    String lastReplId = replicateAndVerify(replPolicy, null,
+            dumpWithoutAcidClause, null, bootstrapReplicatedTables);
+
+    // Enable acid tables for replication. Also, replace, replication policy 
to exclude "b1" and "a3"
+    // instead of "a1" alone.
+    String oldReplPolicy = replPolicy;
+    replPolicy = primaryDbName + ".['[a-z]+[0-9]+'].['a3', 'b1']";
+    List<String> dumpWithAcidBootstrapClause = Arrays.asList(
+            "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'",
+            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'");
+    String[] incrementalReplicatedTables = new String[] {"a1", "a2", "a4", 
"b2", "c1" };
+    String[] bootstrappedTables = new String[] {"a2", "a4", "b2", "c1" };
+    replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
+            dumpWithAcidBootstrapClause, null, bootstrappedTables, 
incrementalReplicatedTables);
+  }
+
+  @Test
+  public void testReplacePolicyWhenAcidTablesDisabledForRepl() throws 
Throwable {
+    String[] originalNonAcidTables = new String[] {"a1", "b1", "c1" };
+    String[] originalFullAcidTables = new String[] {"a2" };
+    createTables(originalNonAcidTables, CreateTableType.NON_ACID);
+    createTables(originalFullAcidTables, CreateTableType.FULL_ACID);
+
+    // Replicate and verify if only non-acid tables are replicated to target.
+    List<String> dumpWithoutAcidClause = Collections.singletonList(
+            "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'");
+    String replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['b1']";
+    String[] bootstrapReplicatedTables = new String[] {"a1" };
+    String lastReplId = replicateAndVerify(replPolicy, null,
+            dumpWithoutAcidClause, null, bootstrapReplicatedTables);
+
+    // Continue to disable ACID tables for replication. Also, replace, 
replication policy to include
+    // "a2" but exclude "a1" and "b1". Still ACID tables shouldn't be 
bootstrapped. Only non-ACID
+    // table "b1" should be bootstrapped.
+    String oldReplPolicy = replPolicy;
+    replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['a2']";
+    String[] incrementalReplicatedTables = new String[] {"a1", "b1" };
+    String[] bootstrappedTables = new String[] {"b1" };
+    lastReplId = replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
+            dumpWithoutAcidClause, null, bootstrappedTables, 
incrementalReplicatedTables);
+  }
+
+  @Test
+  public void testReplacePolicyOnBootstrapExternalTablesIncrementalPhase() 
throws Throwable {
+    String[] originalAcidTables = new String[] {"a1", "b1" };
+    String[] originalExternalTables = new String[] {"a2", "b2", "c2" };
+    createTables(originalAcidTables, CreateTableType.FULL_ACID);
+    createTables(originalExternalTables, CreateTableType.EXTERNAL);
+
+    // Bootstrap should exclude external tables.
+    List<String> loadWithClause = 
ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, 
replica);
+    List<String> dumpWithClause = Collections.singletonList(
+            "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + 
"'='false'"
+    );
+    String replPolicy = primaryDbName + ".['a[0-9]+', 'b1'].['a1']";
+    String[] bootstrapReplicatedTables = new String[] {"b1" };
+    String lastReplId = replicateAndVerify(replPolicy, null,
+            dumpWithClause, loadWithClause, bootstrapReplicatedTables);
+
+    // Continue to disable external tables for replication. Also, replace, 
replication policy to exclude
+    // "b1" and include "a1".
+    String oldReplPolicy = replPolicy;
+    replPolicy = primaryDbName + ".['a[0-9]+', 'b[0-9]+'].['a2', 'b1']";
+    String[] incrementalReplicatedTables = new String[] {"a1" };
+    String[] bootstrappedTables = new String[] {"a1" };
+    lastReplId = replicateAndVerify(replPolicy, oldReplPolicy, lastReplId,
+            dumpWithClause, loadWithClause, bootstrappedTables, 
incrementalReplicatedTables);
+
+    // Enable external tables replication and bootstrap in incremental phase. 
Also, replace,
+    // replication policy to exclude tables with prefix "b".
+    oldReplPolicy = replPolicy;
+    replPolicy = primaryDbName + ".['[a-z]+[0-9]+'].['b[0-9]+']";
+    incrementalReplicatedTables = new String[] {"a1", "a2", "c2" };
+    bootstrappedTables = new String[] {"a2", "c2" };
+    dumpWithClause = Arrays.asList("'" + 
HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
+            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + 
"'='true'");
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+            .dump(replPolicy, oldReplPolicy, lastReplId, dumpWithClause);
+
+    // the _external_tables_file info should be created as external tables are 
to be replicated.
+    Assert.assertTrue(primary.miniDFSCluster.getFileSystem()
+            .exists(new Path(tuple.dumpLocation, FILE_NAME)));
+
+    // Verify that the external table info contains table "a2" and "c2".
+    ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2", 
"c2"),
+            new Path(tuple.dumpLocation, FILE_NAME));
+
+    // Verify if the expected tables are bootstrapped.
+    verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, 
bootstrappedTables);
+
+    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(incrementalReplicatedTables);
+  }
 }
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index cdf9071..6326bc3 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -265,6 +265,18 @@ public class WarehouseInstance implements Closeable {
     return dump(dumpCommand);
   }
 
+  Tuple dump(String replPolicy, String oldReplPolicy, String 
lastReplicationId, List<String> withClauseOptions)
+          throws Throwable {
+    String dumpCommand =
+            "REPL DUMP " + replPolicy
+                    + (oldReplPolicy == null ? "" : " REPLACE " + 
oldReplPolicy)
+                    + (lastReplicationId == null ? "" : " FROM " + 
lastReplicationId);
+    if (!withClauseOptions.isEmpty()) {
+      dumpCommand += " with (" + StringUtils.join(withClauseOptions, ",") + 
")";
+    }
+    return dump(dumpCommand);
+  }
+
   Tuple dump(String dumpCommand) throws Throwable {
     advanceDumpDir();
     run(dumpCommand);
@@ -303,8 +315,6 @@ public class WarehouseInstance implements Closeable {
     if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
       replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + 
")";
     }
-    run("EXPLAIN " + replLoadCmd);
-    printOutput();
     return run(replLoadCmd);
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 453929a..000d663 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -134,7 +134,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     for (String s : values) {
       LOG.debug("    > " + s);
     }
-    Utils.writeOutput(values, new Path(work.resultTempPath), conf);
+    Utils.writeOutput(Collections.singletonList(values), new 
Path(work.resultTempPath), conf);
   }
 
   /**
@@ -143,31 +143,69 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
    * locations.
    * 2. External or ACID tables are being bootstrapped for the first time : so 
that we can dump
    * those tables as a whole.
-   * @return
+   * 3. If replication policy is changed/replaced, then need to examine all 
the tables to see if
+   * any of them need to be bootstrapped as old policy doesn't include it but 
new one does.
+   * @return true if need to examine tables for dump and false if not.
+   */
+  private boolean shouldExamineTablesToDump() {
+    return (work.oldReplScope != null)
+            || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)
+            || conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES);
+  }
+
+  /**
+   * Decide whether to dump external tables data. If external tables are 
enabled for replication,
+   * then need to dump it's data in all the incremental dumps.
+   * @return true if need to dump external table data and false if not.
    */
   private boolean shouldDumpExternalTableLocation() {
     return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
-            && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)
-            || 
conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES));
+            && !conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY);
   }
 
-  private boolean shouldExamineTablesToDump() {
-    return shouldDumpExternalTableLocation() ||
-            conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
+  /**
+   * Decide whether to dump external tables.
+   * @param tableName - Name of external table to be replicated
+   * @return true if need to bootstrap dump external table and false if not.
+   */
+  private boolean shouldBootstrapDumpExternalTable(String tableName) {
+    // Note: If repl policy is replaced, then need to dump external tables if 
table is getting replicated
+    // for the first time in current dump. So, need to check if table is 
included in old policy.
+    return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
+            && 
(conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES)
+            || !ReplUtils.tableIncludedInReplScope(work.oldReplScope, 
tableName));
+  }
+
+  /**
+   * Decide whether to dump ACID tables.
+   * @param tableName - Name of ACID table to be replicated
+   * @return true if need to bootstrap dump ACID table and false if not.
+   */
+  private boolean shouldBootstrapDumpAcidTable(String tableName) {
+    // Note: If repl policy is replaced, then need to dump ACID tables if 
table is getting replicated
+    // for the first time in current dump. So, need to check if table is 
included in old policy.
+    return ReplUtils.includeAcidTableInDump(conf)
+            && (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)
+            || !ReplUtils.tableIncludedInReplScope(work.oldReplScope, 
tableName));
   }
 
   private boolean shouldBootstrapDumpTable(Table table) {
-    if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES) &&
-            TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+    // Note: If control reaches here, it means, table is already included in 
new replication policy.
+    if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
+            && shouldBootstrapDumpExternalTable(table.getTableName())) {
       return true;
     }
 
-    if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES) &&
-           AcidUtils.isTransactionalTable(table)) {
+    if (AcidUtils.isTransactionalTable(table)
+            && shouldBootstrapDumpAcidTable(table.getTableName())) {
       return true;
     }
 
-    return false;
+    // If replication policy is changed with new included/excluded tables 
list, then tables which
+    // are not included in old policy but included in new policy should be 
bootstrapped along with
+    // the current incremental replication dump.
+    // Control reaches for Non-ACID tables.
+    return !ReplUtils.tableIncludedInReplScope(work.oldReplScope, 
table.getTableName());
   }
 
   private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, 
Hive hiveDb) throws Exception {
@@ -181,7 +219,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     // bootstrap (See bootstrapDump() for more details. Only difference here 
is instead of
     // waiting for the concurrent transactions to finish, we start dumping the 
incremental events
     // and wait only for the remaining time if any.
-    if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
+    if (needBootstrapAcidTablesDuringIncrementalDump()) {
       bootDumpBeginReplId = 
queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L);
       assert (bootDumpBeginReplId >= 0);
       LOG.info("Dump for bootstrapping ACID tables during an incremental dump 
for db {}",
@@ -233,24 +271,23 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     replLogger.endLog(lastReplId.toString());
 
     LOG.info("Done dumping events, preparing to return {},{}", 
dumpRoot.toUri(), lastReplId);
-    Utils.writeOutput(
-        Arrays.asList(
-            "incremental",
-            String.valueOf(work.eventFrom),
-            String.valueOf(lastReplId)
-        ),
-        dmd.getDumpFilePath(), conf);
     dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot);
-    dmd.write();
 
-    // If required wait more for any transactions open at the time of starting 
the ACID bootstrap.
-    if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
-      assert (waitUntilTime > 0);
-      validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
+    // If repl policy is changed (oldReplScope is set), then pass the current 
replication policy,
+    // so that REPL LOAD would drop the tables which are not included in 
current policy.
+    if (work.oldReplScope != null) {
+      dmd.setReplScope(work.replScope);
     }
+    dmd.write();
 
     // Examine all the tables if required.
     if (shouldExamineTablesToDump()) {
+      // If required wait more for any transactions open at the time of 
starting the ACID bootstrap.
+      if (needBootstrapAcidTablesDuringIncrementalDump()) {
+        assert (waitUntilTime > 0);
+        validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
+      }
+
       Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true);
 
       try (Writer writer = new Writer(dumpRoot, conf)) {
@@ -259,8 +296,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
             Table table = hiveDb.getTable(dbName, tableName);
 
             // Dump external table locations if required.
-            if (shouldDumpExternalTableLocation() &&
-                    TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+            if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
+                  && shouldDumpExternalTableLocation()) {
               writer.dataLocationDump(table);
             }
 
@@ -282,6 +319,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     return lastReplId;
   }
 
+  private boolean needBootstrapAcidTablesDuringIncrementalDump() {
+    // If old replication policy is available, then it is possible some of the 
ACID tables might be
+    // included for bootstrap during incremental dump.
+    return (ReplUtils.includeAcidTableInDump(conf)
+            && ((work.oldReplScope != null) || 
conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)));
+  }
+
   private Path getBootstrapDbRoot(Path dumpRoot, String dbName, boolean 
isIncrementalPhase) {
     if (isIncrementalPhase) {
       dumpRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
@@ -296,7 +340,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
         db,
         conf,
         getNewEventOnlyReplicationSpec(ev.getEventId()),
-        work.replScope
+        work.replScope,
+        work.oldReplScope
     );
     EventHandler eventHandler = EventHandlerFactory.handlerFor(ev);
     eventHandler.handle(context);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index 247066c..7bae9ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -30,6 +30,7 @@ import java.io.Serializable;
     Explain.Level.EXTENDED })
 public class ReplDumpWork implements Serializable {
   final ReplScope replScope;
+  final ReplScope oldReplScope;
   final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath;
   final Long eventFrom;
   Long eventTo;
@@ -40,10 +41,11 @@ public class ReplDumpWork implements Serializable {
     testInjectDumpDir = dumpDir;
   }
 
-  public ReplDumpWork(ReplScope replScope,
+  public ReplDumpWork(ReplScope replScope, ReplScope oldReplScope,
                       Long eventFrom, Long eventTo, String 
astRepresentationForErrorMsg, Integer maxEventLimit,
                       String resultTempPath) {
     this.replScope = replScope;
+    this.oldReplScope = oldReplScope;
     this.dbNameOrPattern = replScope.getDbName();
     this.eventFrom = eventFrom;
     this.eventTo = eventTo;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 7d2c7c9..e95fe1f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -17,10 +17,12 @@
  */
 package org.apache.hadoop.hive.ql.exec.repl;
 
+import com.google.common.collect.Collections2;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.repl.ReplScope;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.InvalidInputException;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
@@ -364,6 +367,35 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
     }
   }
 
+  /**
+   * If replication policy is changed between previous and current load, then 
the excluded tables in
+   * the new replication policy will be dropped.
+   * @throws HiveException Failed to get/drop the tables.
+   */
+  private void dropTablesExcludedInReplScope(ReplScope replScope) throws 
HiveException {
+    // If all tables are included in replication scope, then nothing to be 
dropped.
+    if ((replScope == null) || replScope.includeAllTables()) {
+      return;
+    }
+
+    Hive db = getHive();
+    String dbName = replScope.getDbName();
+
+    // List all the tables that are excluded in the current repl scope.
+    Iterable<String> tableNames = Collections2.filter(db.getAllTables(dbName),
+        tableName -> {
+          assert(tableName != null);
+          return !tableName.toLowerCase().startsWith(
+                  SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase())
+                  && !replScope.tableIncludedInReplScope(tableName);
+        });
+    for (String table : tableNames) {
+      db.dropTable(dbName + "." + table, true);
+    }
+    LOG.info("Tables in the Database: {} that are excluded in the replication 
scope are dropped.",
+            dbName);
+  }
+
   private void createEndReplLogTask(Context context, Scope scope,
                                     ReplLogger replLogger) throws 
SemanticException {
     Map<String, String> dbProps;
@@ -457,6 +489,10 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
         work.needCleanTablesFromBootstrap = false;
       }
 
+      // If replication policy is changed between previous and current repl 
load, then drop the tables
+      // that are excluded in the new replication policy.
+      dropTablesExcludedInReplScope(work.currentReplScope);
+
       IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder();
 
       // If incremental events are already applied, then check and perform if 
need to bootstrap any tables.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index f1f764e..1d63cd8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.repl;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.repl.ReplScope;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
 import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
@@ -42,6 +43,7 @@ import static 
org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.D
     Explain.Level.EXTENDED })
 public class ReplLoadWork implements Serializable {
   final String dbNameToLoadIn;
+  final ReplScope currentReplScope;
   final String dumpDirectory;
   final String bootstrapDumpToCleanTables;
   boolean needCleanTablesFromBootstrap;
@@ -61,12 +63,19 @@ public class ReplLoadWork implements Serializable {
   */
   final LineageState sessionStateLineageState;
 
-  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String 
dbNameToLoadIn,
-      LineageState lineageState, boolean isIncrementalDump, Long eventTo,
-      List<DirCopyWork> pathsToCopyIterator) throws IOException {
+  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory,
+                      String dbNameToLoadIn, ReplScope currentReplScope,
+                      LineageState lineageState, boolean isIncrementalDump, 
Long eventTo,
+                      List<DirCopyWork> pathsToCopyIterator) throws 
IOException {
     sessionStateLineageState = lineageState;
     this.dumpDirectory = dumpDirectory;
     this.dbNameToLoadIn = dbNameToLoadIn;
+    this.currentReplScope = currentReplScope;
+
+    // If DB name is changed during REPL LOAD, then set it instead of 
referring to source DB name.
+    if ((currentReplScope != null) && StringUtils.isNotBlank(dbNameToLoadIn)) {
+      currentReplScope.setDbName(dbNameToLoadIn);
+    }
     this.bootstrapDumpToCleanTables = 
hiveConf.get(ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG);
     this.needCleanTablesFromBootstrap = 
StringUtils.isNotBlank(this.bootstrapDumpToCleanTables);
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 27127e0..2db6073 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.repl.util;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.repl.ReplConst;
+import org.apache.hadoop.hive.common.repl.ReplScope;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
@@ -272,4 +273,8 @@ public class ReplUtils {
 
     return true;
   }
+
+  public static boolean tableIncludedInReplScope(ReplScope replScope, String 
tableName) {
+    return ((replScope == null) || 
replScope.tableIncludedInReplScope(tableName));
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index cfdf180..e023005 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -890,20 +890,28 @@ importStatement
     ;
 
 replDumpStatement
-@init { pushMsg("replication dump statement", state); }
+@init { pushMsg("Replication dump statement", state); }
 @after { popMsg(state); }
       : KW_REPL KW_DUMP
-        (dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)?
+        (dbPolicy=replDbPolicy)
+        (KW_REPLACE oldDbPolicy=replDbPolicy)?
         (KW_FROM (eventId=Number)
           (KW_TO (rangeEnd=Number))?
           (KW_LIMIT (batchSize=Number))?
         )?
         (KW_WITH replConf=replConfigs)?
-    -> ^(TOK_REPL_DUMP $dbName $tablePolicy? ^(TOK_FROM $eventId (TOK_TO 
$rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?)
+    -> ^(TOK_REPL_DUMP $dbPolicy ^(TOK_REPLACE $oldDbPolicy)? ^(TOK_FROM 
$eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?)
+    ;
+
+replDbPolicy
+@init { pushMsg("Repl dump DB replication policy", state); }
+@after { popMsg(state); }
+    :
+      (dbName=identifier) (DOT tablePolicy=replTableLevelPolicy)? -> $dbName 
$tablePolicy?
     ;
 
 replLoadStatement
-@init { pushMsg("replication load statement", state); }
+@init { pushMsg("Replication load statement", state); }
 @after { popMsg(state); }
       : KW_REPL KW_LOAD
         (dbName=identifier)?
@@ -913,21 +921,21 @@ replLoadStatement
       ;
 
 replConfigs
-@init { pushMsg("repl configurations", state); }
+@init { pushMsg("Repl configurations", state); }
 @after { popMsg(state); }
     :
       LPAREN replConfigsList RPAREN -> ^(TOK_REPL_CONFIG replConfigsList)
     ;
 
 replConfigsList
-@init { pushMsg("repl configurations list", state); }
+@init { pushMsg("Repl configurations list", state); }
 @after { popMsg(state); }
     :
       keyValueProperty (COMMA keyValueProperty)* -> ^(TOK_REPL_CONFIG_LIST 
keyValueProperty+)
     ;
 
 replTableLevelPolicy
-@init { pushMsg("replication table level policy definition", state); }
+@init { pushMsg("Replication table level policy definition", state); }
 @after { popMsg(state); }
     :
       ((replTablesIncludeList=replTablesList) (DOT 
replTablesExcludeList=replTablesList)?)
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 4f4a02f..df41a2e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -59,6 +59,7 @@ import static 
org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_LIMIT;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_NULL;
+import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_DUMP;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_LOAD;
@@ -70,6 +71,7 @@ import static 
org.apache.hadoop.hive.ql.parse.HiveParser.TOK_TO;
 public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   // Replication Scope
   private ReplScope replScope = new ReplScope();
+  private ReplScope oldReplScope = null;
 
   private Long eventFrom;
   private Long eventTo;
@@ -130,12 +132,13 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     }
   }
 
-  private void setReplDumpTablesList(Tree replTablesNode) throws HiveException 
{
+  private void setReplDumpTablesList(Tree replTablesNode, ReplScope replScope) 
throws HiveException {
     int childCount = replTablesNode.getChildCount();
     assert(childCount <= 2);
 
     // Traverse the children which can be either just include tables list or 
both include
     // and exclude tables lists.
+    String replScopeType = (replScope == this.replScope) ? "Current" : "Old";
     for (int listIdx = 0; listIdx < childCount; listIdx++) {
       Tree tablesListNode = replTablesNode.getChild(listIdx);
       assert(tablesListNode.getType() == TOK_REPL_TABLES_LIST);
@@ -151,21 +154,49 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       }
 
       if (listIdx == 0) {
-        LOG.info("ReplScope: Set Included Tables List: {}", tablesList);
+        LOG.info("{} ReplScope: Set Included Tables List: {}", replScopeType, 
tablesList);
         replScope.setIncludedTablePatterns(tablesList);
       } else {
-        LOG.info("ReplScope: Set Excluded Tables List: {}", tablesList);
+        LOG.info("{} ReplScope: Set Excluded Tables List: {}", replScopeType, 
tablesList);
         replScope.setExcludedTablePatterns(tablesList);
       }
     }
   }
 
+  private void setOldReplPolicy(Tree oldReplPolicyTree) throws HiveException {
+    oldReplScope = new ReplScope();
+    int childCount = oldReplPolicyTree.getChildCount();
+
+    // First child is DB name and optional second child is tables list.
+    assert(childCount <= 2);
+
+    // First child is always the DB name. So set it.
+    oldReplScope.setDbName(oldReplPolicyTree.getChild(0).getText());
+    LOG.info("Old ReplScope: Set DB Name: {}", oldReplScope.getDbName());
+    if (!oldReplScope.getDbName().equalsIgnoreCase(replScope.getDbName())) {
+      LOG.error("DB name {} cannot be replaced to {} in the replication 
policy.",
+              oldReplScope.getDbName(), replScope.getDbName());
+      throw new SemanticException("DB name cannot be replaced in the 
replication policy.");
+    }
+
+    // If the old policy is just <db_name>, then tables list won't be there.
+    if (childCount <= 1) {
+      return;
+    }
+
+    // Traverse the children which can be either just include tables list or 
both include
+    // and exclude tables lists.
+    Tree oldPolicyTablesListNode = oldReplPolicyTree.getChild(1);
+    assert(oldPolicyTablesListNode.getType() == TOK_REPL_TABLES);
+    setReplDumpTablesList(oldPolicyTablesListNode, oldReplScope);
+  }
+
   private void initReplDump(ASTNode ast) throws HiveException {
     int numChildren = ast.getChildCount();
     boolean isMetaDataOnly = false;
 
     String dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
-    LOG.info("ReplScope: Set DB Name: {}", dbNameOrPattern);
+    LOG.info("Current ReplScope: Set DB Name: {}", dbNameOrPattern);
     replScope.setDbName(dbNameOrPattern);
 
     // Skip the first node, which is always required
@@ -185,7 +216,11 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           break;
         }
         case TOK_REPL_TABLES: {
-          setReplDumpTablesList(currNode);
+          setReplDumpTablesList(currNode, replScope);
+          break;
+        }
+        case TOK_REPLACE: {
+          setOldReplPolicy(currNode);
           break;
         }
         case TOK_FROM: {
@@ -247,6 +282,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       Task<ReplDumpWork> replDumpWorkTask = TaskFactory
           .get(new ReplDumpWork(
               replScope,
+              oldReplScope,
               eventFrom,
               eventTo,
               ErrorMsg.INVALID_PATH.getMsg(ast),
@@ -413,6 +449,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         LOG.debug("{} contains an bootstrap dump", loadPath);
       }
       ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), 
replScope.getDbName(),
+              dmd.getReplScope(),
               queryState.getLineageState(), evDump, dmd.getEventTo(),
           dirLocationsToCopy(loadPath, evDump));
       rootTasks.add(TaskFactory.get(replLoadWork, conf));
@@ -511,6 +548,6 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       LOG.debug("    > " + s);
     }
     ctx.setResFile(ctx.getLocalTmpPath());
-    Utils.writeOutput(values, ctx.getResFile(), conf);
+    Utils.writeOutput(Collections.singletonList(values), ctx.getResFile(), 
conf);
   }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index c2e26f0..4500fb6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -169,7 +169,7 @@ public class TableExport {
   }
 
   private boolean shouldExport() {
-    return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, 
false, conf);
+    return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, 
false, null, conf);
   }
 
   /**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index ce5ef06..11a4d62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -59,18 +59,20 @@ public class Utils {
     IDLE, ACTIVE
   }
 
-  public static void writeOutput(List<String> values, Path outputFile, 
HiveConf hiveConf)
+  public static void writeOutput(List<List<String>> listValues, Path 
outputFile, HiveConf hiveConf)
       throws SemanticException {
     DataOutputStream outStream = null;
     try {
       FileSystem fs = outputFile.getFileSystem(hiveConf);
       outStream = fs.create(outputFile);
-      outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput 
: values.get(0)));
-      for (int i = 1; i < values.size(); i++) {
-        outStream.write(Utilities.tabCode);
-        outStream.writeBytes((values.get(i) == null ? 
Utilities.nullStringOutput : values.get(i)));
+      for (List<String> values : listValues) {
+        outStream.writeBytes((values.get(0) == null ? 
Utilities.nullStringOutput : values.get(0)));
+        for (int i = 1; i < values.size(); i++) {
+          outStream.write(Utilities.tabCode);
+          outStream.writeBytes((values.get(i) == null ? 
Utilities.nullStringOutput : values.get(i)));
+        }
+        outStream.write(Utilities.newLineCode);
       }
-      outStream.write(Utilities.newLineCode);
     } catch (IOException e) {
       throw new SemanticException(e);
     } finally {
@@ -175,7 +177,7 @@ public class Utils {
    * specific checks.
    */
   public static boolean shouldReplicate(ReplicationSpec replicationSpec, Table 
tableHandle,
-                                        boolean isEventDump, HiveConf 
hiveConf) {
+                                        boolean isEventDump, ReplScope 
oldReplScope, HiveConf hiveConf) {
     if (replicationSpec == null) {
       replicationSpec = new ReplicationSpec();
     }
@@ -199,8 +201,10 @@ public class Utils {
                 || replicationSpec.isMetadataOnly();
         if (isEventDump) {
           // Skip dumping of events related to external tables if bootstrap is 
enabled on it.
+          // Also, skip if current table is included only in new policy but 
not in old policy.
           shouldReplicateExternalTables = shouldReplicateExternalTables
-                  && 
!hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES);
+                  && 
!hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES)
+                  && ReplUtils.tableIncludedInReplScope(oldReplScope, 
tableHandle.getTableName());
         }
         return shouldReplicateExternalTables;
       }
@@ -210,17 +214,30 @@ public class Utils {
           return false;
         }
 
-        // Skip dumping events related to ACID tables if bootstrap is enabled 
on it
+        // Skip dumping events related to ACID tables if bootstrap is enabled 
on it.
+        // Also, skip if current table is included only in new policy but not 
in old policy.
         if (isEventDump) {
-          return 
!hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
+          return 
!hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)
+                  && ReplUtils.tableIncludedInReplScope(oldReplScope, 
tableHandle.getTableName());
         }
       }
+
+      // If replication policy is replaced with new included/excluded tables 
list, then events
+      // corresponding to tables which are not included in old policy but 
included in new policy
+      // should be skipped. Those tables would be bootstrapped along with the 
current incremental
+      // replication dump.
+      // Note: If any event dump reaches here, it means, table is included in 
new replication policy.
+      if (isEventDump && !ReplUtils.tableIncludedInReplScope(oldReplScope, 
tableHandle.getTableName())) {
+        return false;
+      }
     }
     return true;
   }
 
   public static boolean shouldReplicate(NotificationEvent tableForEvent,
-      ReplicationSpec replicationSpec, Hive db, boolean isEventDump, HiveConf 
hiveConf) {
+                                        ReplicationSpec replicationSpec, Hive 
db,
+                                        boolean isEventDump, ReplScope 
oldReplScope,
+                                        HiveConf hiveConf) {
     Table table;
     try {
       table = db.getTable(tableForEvent.getDbName(), 
tableForEvent.getTableName());
@@ -230,7 +247,7 @@ public class Utils {
               .getTableName(), e);
       return false;
     }
-    return shouldReplicate(replicationSpec, table, isEventDump, hiveConf);
+    return shouldReplicate(replicationSpec, table, isEventDump, oldReplScope, 
hiveConf);
   }
 
   static List<Path> getDataPathList(Path fromPath, ReplicationSpec 
replicationSpec, HiveConf conf)
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
index d938cc1..be6574d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractConstraintEventHandler.java
@@ -32,6 +32,7 @@ abstract class AbstractConstraintEventHandler<T extends 
EventMessage> extends Ab
         withinContext.replicationSpec,
         withinContext.db,
             true,
+        withinContext.oldReplScope,
         withinContext.hiveConf
     );
   }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
index 0756f59..b6d2d61 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
@@ -67,7 +67,8 @@ class AddPartitionHandler extends AbstractEventHandler {
     }
 
     final Table qlMdTable = new Table(tobj);
-    if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, 
withinContext.hiveConf)) {
+    if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable,
+            true, withinContext.oldReplScope,  withinContext.hiveConf)) {
       return;
     }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
index bd25a6c..a940102 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
@@ -49,6 +49,15 @@ class AllocWriteIdHandler extends 
AbstractEventHandler<AllocWriteIdMessage> {
       return;
     }
 
+    // If replication policy is replaced with new included/excluded tables 
list, then events
+    // corresponding to tables which are not included in old policy but 
included in new policy
+    // should be skipped. Those tables would be bootstrapped along with the 
current incremental
+    // replication dump.
+    // Note: If any event dump reaches here, it means, it is included in new 
replication policy.
+    if (!ReplUtils.tableIncludedInReplScope(withinContext.oldReplScope, 
eventMessage.getTableName())) {
+      return;
+    }
+
     DumpMetaData dmd = withinContext.createDmd(this);
     dmd.setPayload(eventMessageAsJSON);
     dmd.write();
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
index e59bdf6..7d6599b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterPartitionHandler.java
@@ -100,7 +100,8 @@ class AlterPartitionHandler extends 
AbstractEventHandler<AlterPartitionMessage>
     }
 
     Table qlMdTable = new Table(tableObject);
-    if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, 
withinContext.hiveConf)) {
+    if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable,
+            true, withinContext.oldReplScope,  withinContext.hiveConf)) {
       return;
     }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
index f02cbb8..89bd0bf 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AlterTableHandler.java
@@ -88,7 +88,8 @@ class AlterTableHandler extends 
AbstractEventHandler<AlterTableMessage> {
 
     Table qlMdTableBefore = new Table(before);
     if (!Utils
-        .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore, true, 
withinContext.hiveConf)) {
+        .shouldReplicate(withinContext.replicationSpec, qlMdTableBefore,
+                true, withinContext.oldReplScope, withinContext.hiveConf)) {
       return;
     }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index 995d634..a13fe38 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -113,7 +113,13 @@ class CommitTxnHandler extends 
AbstractEventHandler<CommitTxnMessage> {
             : new ArrayList<>(Collections2.filter(writeEventInfoList,
               writeEventInfo -> {
                 assert(writeEventInfo != null);
-                return 
withinContext.replScope.tableIncludedInReplScope(writeEventInfo.getTable());
+                // If replication policy is replaced with new 
included/excluded tables list, then events
+                // corresponding to tables which are included in both old and 
new policies should be dumped.
+                // If table is included in new policy but not in old policy, 
then it should be skipped.
+                // Those tables would be bootstrapped along with the current 
incremental
+                // replication dump.
+                return 
(ReplUtils.tableIncludedInReplScope(withinContext.replScope, 
writeEventInfo.getTable())
+                        && 
ReplUtils.tableIncludedInReplScope(withinContext.oldReplScope, 
writeEventInfo.getTable()));
               })));
   }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
index 8a838db..be7f7eb 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
@@ -54,7 +54,8 @@ class CreateTableHandler extends 
AbstractEventHandler<CreateTableMessage> {
 
     Table qlMdTable = new Table(tobj);
 
-    if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, 
withinContext.hiveConf)) {
+    if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable,
+            true, withinContext.oldReplScope, withinContext.hiveConf)) {
       return;
     }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
index 0d60c31..b75d12b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
@@ -41,15 +41,17 @@ public interface EventHandler {
     final HiveConf hiveConf;
     final ReplicationSpec replicationSpec;
     final ReplScope replScope;
+    final ReplScope oldReplScope;
 
     public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf,
-        ReplicationSpec replicationSpec, ReplScope replScope) {
+        ReplicationSpec replicationSpec, ReplScope replScope, ReplScope 
oldReplScope) {
       this.eventRoot = eventRoot;
       this.cmRoot = cmRoot;
       this.db = db;
       this.hiveConf = hiveConf;
       this.replicationSpec = replicationSpec;
       this.replScope = replScope;
+      this.oldReplScope = oldReplScope;
     }
 
     public Context(Context other) {
@@ -59,6 +61,7 @@ public interface EventHandler {
       this.hiveConf = other.hiveConf;
       this.replicationSpec = other.replicationSpec;
       this.replScope = other.replScope;
+      this.oldReplScope = other.oldReplScope;
     }
 
     void setEventRoot(Path eventRoot) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index 1bcd529..116db5b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -58,7 +58,8 @@ class InsertHandler extends 
AbstractEventHandler<InsertMessage> {
       withinContext.replicationSpec.setNoop(true);
     }
 
-    if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, 
withinContext.hiveConf)) {
+    if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable,
+            true, withinContext.oldReplScope, withinContext.hiveConf)) {
       return;
     }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
index 54fc7a6..23d088d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
@@ -52,8 +52,8 @@ class UpdatePartColStatHandler extends 
AbstractEventHandler<UpdatePartitionColum
       return;
     }
 
-    if (!Utils.shouldReplicate(withinContext.replicationSpec, new 
Table(tableObj), true,
-                              withinContext.hiveConf)) {
+    if (!Utils.shouldReplicate(withinContext.replicationSpec, new 
Table(tableObj),
+            true, withinContext.oldReplScope, withinContext.hiveConf)) {
       return;
     }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
index 62db959..ab314ed 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
@@ -37,7 +37,8 @@ class UpdateTableColStatHandler extends 
AbstractEventHandler<UpdateTableColumnSt
   public void handle(Context withinContext) throws Exception {
     LOG.info("Processing#{} UpdateTableColumnStat message : {}", 
fromEventId(), eventMessageAsJSON);
     Table qlMdTable = new Table(eventMessage.getTableObject());
-    if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable, true, 
withinContext.hiveConf)) {
+    if (!Utils.shouldReplicate(withinContext.replicationSpec, qlMdTable,
+            true, withinContext.oldReplScope, withinContext.hiveConf)) {
       return;
     }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index 552183a..bef4780 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -52,7 +52,7 @@ public class TableSerializer implements JsonWriter.Serializer 
{
   @Override
   public void writeTo(JsonWriter writer, ReplicationSpec 
additionalPropertiesProvider)
       throws SemanticException, IOException {
-    if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, 
false, hiveConf)) {
+    if (!Utils.shouldReplicate(additionalPropertiesProvider, tableHandle, 
false, null, hiveConf)) {
       return;
     }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
index 974e105..4c137eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
@@ -19,32 +19,38 @@ package org.apache.hadoop.hive.ql.parse.repl.load;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.repl.ReplScope;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 public class DumpMetaData {
   // wrapper class for reading and writing metadata about a dump
   // responsible for _dumpmetadata files
   private static final String DUMP_METADATA = "_dumpmetadata";
+  private static final Logger LOG = 
LoggerFactory.getLogger(DumpMetaData.class);
 
   private DumpType dumpType;
   private Long eventFrom = null;
   private Long eventTo = null;
+  private Path cmRoot;
   private String payload = null;
-  private boolean initialized = false;
+  private ReplScope replScope = null;
 
+  private boolean initialized = false;
   private final Path dumpFile;
   private final HiveConf hiveConf;
-  private Path cmRoot;
 
   public DumpMetaData(Path dumpRoot, HiveConf hiveConf) {
     this.hiveConf = hiveConf;
@@ -61,8 +67,66 @@ public class DumpMetaData {
     this.dumpType = lvl;
     this.eventFrom = eventFrom;
     this.eventTo = eventTo;
-    this.initialized = true;
     this.cmRoot = cmRoot;
+    this.initialized = true;
+  }
+
+  public void setPayload(String payload) {
+    this.payload = payload;
+  }
+
+  public void setReplScope(ReplScope replScope) {
+    this.replScope = replScope;
+  }
+
+  private void readReplScope(String line) throws IOException {
+    if (line == null) {
+      return;
+    }
+
+    String[] lineContents = line.split("\t");
+    if (lineContents.length < 1) {
+      return;
+    }
+
+    replScope = new ReplScope();
+
+    LOG.info("Read ReplScope: Set Db Name: {}.", lineContents[0]);
+    replScope.setDbName(lineContents[0]);
+
+    // Read/set include and exclude tables list.
+    int idx = readReplScopeTablesList(lineContents, 1, true);
+    readReplScopeTablesList(lineContents, idx, false);
+  }
+
+  private int readReplScopeTablesList(String[] lineContents, int startIdx, 
boolean includeList)
+          throws IOException {
+    // If the list doesn't exist, then return.
+    if (startIdx >= lineContents.length) {
+      return startIdx;
+    }
+
+    // Each tables list should start with "{" and ends with "}"
+    if (!"{".equals(lineContents[startIdx])) {
+      throw new IOException("Invalid repl tables list data in dump metadata 
file. Missing \"{\".");
+    }
+
+    List<String>tableNames = new ArrayList<>();
+    for (int i = (startIdx + 1); i < lineContents.length; i++) {
+      String value = lineContents[i];
+      if ("}".equals(value)) {
+        if (includeList) {
+          LOG.info("Read ReplScope: Set Include Table Names: {}.", tableNames);
+          replScope.setIncludedTablePatterns(tableNames);
+        } else {
+          LOG.info("Read ReplScope: Set Exclude Table Names: {}.", tableNames);
+          replScope.setExcludedTablePatterns(tableNames);
+        }
+        return (i + 1);
+      }
+      tableNames.add(value);
+    }
+    throw new IOException("Invalid repl tables list data in dump metadata 
file. Missing \"}\".");
   }
 
   private void loadDumpFromFile() throws SemanticException {
@@ -71,7 +135,7 @@ public class DumpMetaData {
       // read from dumpfile and instantiate self
       FileSystem fs = dumpFile.getFileSystem(hiveConf);
       br = new BufferedReader(new InputStreamReader(fs.open(dumpFile)));
-      String line = null;
+      String line;
       if ((line = br.readLine()) != null) {
         String[] lineContents = line.split("\t", 5);
         setDump(DumpType.valueOf(lineContents[0]), 
Long.valueOf(lineContents[1]),
@@ -82,6 +146,7 @@ public class DumpMetaData {
         throw new IOException(
             "Unable to read valid values from dumpFile:" + 
dumpFile.toUri().toString());
       }
+      readReplScope(br.readLine());
     } catch (IOException ioe) {
       throw new SemanticException(ioe);
     } finally {
@@ -105,10 +170,6 @@ public class DumpMetaData {
     return this.payload;
   }
 
-  public void setPayload(String payload) {
-    this.payload = payload;
-  }
-
   public Long getEventFrom() throws SemanticException {
     initializeIfNot();
     return eventFrom;
@@ -119,6 +180,10 @@ public class DumpMetaData {
     return eventTo;
   }
 
+  public ReplScope getReplScope() throws SemanticException {
+    initializeIfNot();
+    return replScope;
+  }
   public Path getDumpFilePath() {
     return dumpFile;
   }
@@ -134,17 +199,42 @@ public class DumpMetaData {
     }
   }
 
+  private List<String> prepareReplScopeValues() {
+    assert(replScope != null);
+
+    List<String> values = new ArrayList<>();
+    values.add(replScope.getDbName());
+
+    List<String> includedTableNames = replScope.getIncludedTableNames();
+    List<String> excludedTableNames = replScope.getExcludedTableNames();
+    if (includedTableNames != null) {
+      values.add("{");
+      values.addAll(includedTableNames);
+      values.add("}");
+    }
+    if (excludedTableNames != null) {
+      values.add("{");
+      values.addAll(excludedTableNames);
+      values.add("}");
+    }
+    LOG.info("Preparing ReplScope {} to dump.", values);
+    return values;
+  }
 
   public void write() throws SemanticException {
-    Utils.writeOutput(
+    List<List<String>> listValues = new ArrayList<>();
+    listValues.add(
         Arrays.asList(
             dumpType.toString(),
             eventFrom.toString(),
             eventTo.toString(),
             cmRoot.toString(),
-            payload),
-        dumpFile,
-        hiveConf
+            payload)
+    );
+    if (replScope != null) {
+      listValues.add(prepareReplScopeValues());
+    }
+    Utils.writeOutput(listValues, dumpFile, hiveConf
     );
   }
 }
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
index a412d43..aacd295 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
@@ -139,7 +139,7 @@ public class TestReplDumpTask {
 
     task.initialize(queryState, null, null, null);
     task.setWork(
-        new ReplDumpWork(replScope,
+        new ReplDumpWork(replScope, null,
             Long.MAX_VALUE, Long.MAX_VALUE, "",
             Integer.MAX_VALUE, "")
     );
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java
index b8c95d5..14b58a3 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplScope.java
@@ -29,8 +29,12 @@ import java.util.regex.Pattern;
 public class ReplScope implements Serializable {
   private String dbName;
   private Pattern dbNamePattern;
-  private List<Pattern> includedTableNamePatterns; // Only for REPL DUMP and 
exist only if tableName == null.
-  private List<Pattern> excludedTableNamePatterns; // Only for REPL DUMP and 
exist only if tableName == null.
+
+  // Include and exclude table names/patterns exist only for REPL DUMP.
+  private List<String> includedTableNames;
+  private List<String> excludedTableNames;
+  private List<Pattern> includedTableNamePatterns;
+  private List<Pattern> excludedTableNamePatterns;
 
   public ReplScope() {
   }
@@ -49,12 +53,22 @@ public class ReplScope implements Serializable {
     return dbName;
   }
 
-  public void setIncludedTablePatterns(List<String> includedTableNamePatterns) 
{
-    this.includedTableNamePatterns = 
compilePatterns(includedTableNamePatterns);
+  public void setIncludedTablePatterns(List<String> includedTableNames) {
+    this.includedTableNames = includedTableNames;
+    this.includedTableNamePatterns = compilePatterns(includedTableNames);
+  }
+
+  public List<String> getIncludedTableNames() {
+    return includedTableNames;
+  }
+
+  public void setExcludedTablePatterns(List<String> excludedTableNames) {
+    this.excludedTableNames = excludedTableNames;
+    this.excludedTableNamePatterns = compilePatterns(excludedTableNames);
   }
 
-  public void setExcludedTablePatterns(List<String> excludedTableNamePatterns) 
{
-    this.excludedTableNamePatterns = 
compilePatterns(excludedTableNamePatterns);
+  public List<String> getExcludedTableNames() {
+    return excludedTableNames;
   }
 
   public boolean includeAllTables() {

Reply via email to