This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a712f4b  HIVE-24502:Store table level regular expression used during 
dump for table level replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
a712f4b is described below

commit a712f4b048cd8aa66f3692a3f08b62c14b278353
Author: Anishek Agarwal <anis...@gmail.com>
AuthorDate: Mon Jan 4 09:57:45 2021 +0530

    HIVE-24502:Store table level regular expression used during dump for table 
level replication (Aasha Medhi, reviewed by Pravin Kumar Sinha)
---
 .../hive/ql/parse/TestReplicationScenarios.java    |  4 ++--
 .../parse/TestTableLevelReplicationScenarios.java  | 23 ++++++++++++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     | 12 +++++------
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  4 +++-
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java     |  5 ++++-
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java |  2 +-
 .../hive/ql/parse/repl/load/DumpMetaData.java      | 19 +++++++++++++-----
 7 files changed, 53 insertions(+), 16 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index ab81834..4b54525 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -331,7 +331,7 @@ public class TestReplicationScenarios {
    * appropriately. This tests bootstrap behaviour primarily.
    */
   @Test
-  public void testBasic() throws IOException {
+  public void testBasic() throws IOException, SemanticException {
     String name = testName.getMethodName();
     String dbName = createDB(name, driver);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", 
driver);
@@ -487,7 +487,7 @@ public class TestReplicationScenarios {
     }
     ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, 
loadPath.toString(), sourceDb, replicadb,
             null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId),
-        0L, metricCollector);
+        0L, metricCollector, false);
     Task replLoadTask = TaskFactory.get(replLoadWork, confTemp);
     replLoadTask.initialize(null, null, new TaskQueue(driver.getContext()), 
driver.getContext());
     replLoadTask.executeTask(null);
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index 9d5e8af..4d47254 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import org.junit.Assert;
@@ -46,6 +47,7 @@ import java.io.InputStreamReader;
 
 import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME;
 import static 
org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests Table level replication scenarios.
@@ -156,6 +158,10 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
 
     WarehouseInstance.Tuple tuple = primary.dump(replPolicy, oldReplPolicy, 
dumpWithClause);
 
+    DumpMetaData dumpMetaData = new DumpMetaData(new Path(tuple.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR), conf);
+    Assert.assertEquals(oldReplPolicy != null && 
!replPolicy.equals(oldReplPolicy),
+      dumpMetaData.isReplScopeModified());
+
     if (bootstrappedTables != null) {
       verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, 
bootstrappedTables);
     }
@@ -163,6 +169,8 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
     // If the policy contains '.'' means its table level replication.
     verifyTableListForPolicy(tuple.dumpLocation, replPolicy.contains(".'") ? 
expectedTables : null);
 
+    verifyDumpMetadata(replPolicy, new Path(tuple.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR));
+
     replica.load(replicatedDbName, replPolicy, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables")
@@ -180,6 +188,21 @@ public class TestTableLevelReplicationScenarios extends 
BaseReplicationScenarios
     return tuple.lastReplicationId;
   }
 
+  private void verifyDumpMetadata(String replPolicy, Path dumpPath) throws 
SemanticException {
+    String[] parseReplPolicy = replPolicy.split("\\.'");
+    assertEquals(parseReplPolicy[0], new DumpMetaData(dumpPath, 
conf).getReplScope().getDbName());
+    if (parseReplPolicy.length > 1) {
+      parseReplPolicy[1] = parseReplPolicy[1].replaceAll("'", "");
+      assertEquals(parseReplPolicy[1],
+        new DumpMetaData(dumpPath, 
conf).getReplScope().getIncludedTableNames());
+    }
+    if (parseReplPolicy.length > 2) {
+      parseReplPolicy[2] = parseReplPolicy[2].replaceAll("'", "");
+      assertEquals(parseReplPolicy[2],
+        new DumpMetaData(dumpPath, 
conf).getReplScope().getExcludedTableNames());
+    }
+  }
+
   private String replicateAndVerifyClearDump(String replPolicy, String 
oldReplPolicy, String lastReplId,
                                     List<String> dumpWithClause,
                                     List<String> loadWithClause,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 1fce791..7e690fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.exec.util.Retryable;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.orc.ExternalCache;
 import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
@@ -604,12 +603,11 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     replLogger.endLog(lastReplId.toString());
     LOG.info("Done dumping events, preparing to return {},{}", 
dumpRoot.toUri(), lastReplId);
     long executionId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
-    dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, 
executionId);
+    dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, 
executionId,
+      work.oldReplScope != null);
     // If repl policy is changed (oldReplScope is set), then pass the current 
replication policy,
     // so that REPL LOAD would drop the tables which are not included in 
current policy.
-    if (work.oldReplScope != null) {
-      dmd.setReplScope(work.replScope);
-    }
+    dmd.setReplScope(work.replScope);
     dmd.write(true);
     int cacheSize = 
conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);
     try (FileList managedTblList = createTableFileList(dumpRoot, 
EximUtil.FILE_LIST, cacheSize);
@@ -940,7 +938,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
       LOG.info("Preparing to return {},{}->{}",
               dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
       long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 
0L);
-      dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, 
cmRoot, executorId);
+      dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, 
cmRoot, executorId,
+        work.oldReplScope != null);
+      dmd.setReplScope(work.replScope);
       dmd.write(true);
       work.setFunctionCopyPathIterator(functionsBinaryCopyPaths.iterator());
       setDataCopyIterators(extTableFileList, managedTblList);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index e7245bd..2f41673 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -592,7 +592,9 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
   private int executeIncrementalLoad() throws Exception {
     // If replication policy is changed between previous and current repl 
load, then drop the tables
     // that are excluded in the new replication policy.
-    dropTablesExcludedInReplScope(work.currentReplScope);
+    if (work.replScopeModified) {
+      dropTablesExcludedInReplScope(work.currentReplScope);
+    }
     IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder();
     // If incremental events are already applied, then check and perform if 
need to bootstrap any tables.
     if (!builder.hasMoreWork() && work.isLastReplIDUpdated()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 376fd7c..a52dac2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -58,6 +58,7 @@ public class ReplLoadWork implements Serializable {
   private String sourceDbName;
   private Long dumpExecutionId;
   private final transient ReplicationMetricCollector metricCollector;
+  final boolean replScopeModified;
 
   private final ConstraintEventsIterator constraintsIterator;
   private int loadTaskRunCount = 0;
@@ -78,7 +79,8 @@ public class ReplLoadWork implements Serializable {
                       String sourceDbName, String dbNameToLoadIn, ReplScope 
currentReplScope,
                       LineageState lineageState, boolean isIncrementalDump, 
Long eventTo,
                       Long dumpExecutionId,
-                      ReplicationMetricCollector metricCollector) throws 
IOException, SemanticException {
+                      ReplicationMetricCollector metricCollector,
+                      boolean replScopeModified) throws IOException, 
SemanticException {
     sessionStateLineageState = lineageState;
     this.dumpDirectory = dumpDirectory;
     this.dbNameToLoadIn = dbNameToLoadIn;
@@ -86,6 +88,7 @@ public class ReplLoadWork implements Serializable {
     this.sourceDbName = sourceDbName;
     this.dumpExecutionId = dumpExecutionId;
     this.metricCollector = metricCollector;
+    this.replScopeModified = replScopeModified;
 
 
     // If DB name is changed during REPL LOAD, then set it instead of 
referring to source DB name.
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 4c10499..ed408b0 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -372,7 +372,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
                 dmd.getReplScope(),
                 queryState.getLineageState(), evDump, dmd.getEventTo(), 
dmd.getDumpExecutionId(),
             initMetricCollection(!evDump, loadPath.toString(), 
replScope.getDbName(),
-              dmd.getDumpExecutionId()));
+              dmd.getDumpExecutionId()), dmd.isReplScopeModified());
         rootTasks.add(TaskFactory.get(replLoadWork, conf));
       } else {
         LOG.warn("Previous Dump Already Loaded");
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
index b6d43f7..c428ea2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/DumpMetaData.java
@@ -52,6 +52,7 @@ public class DumpMetaData {
   private final Path dumpFile;
   private final HiveConf hiveConf;
   private Long dumpExecutionId;
+  private boolean replScopeModified = false;
 
   public DumpMetaData(Path dumpRoot, HiveConf hiveConf) {
     this.hiveConf = hiveConf;
@@ -61,16 +62,18 @@ public class DumpMetaData {
   public DumpMetaData(Path dumpRoot, DumpType lvl, Long eventFrom, Long 
eventTo, Path cmRoot,
       HiveConf hiveConf) {
     this(dumpRoot, hiveConf);
-    setDump(lvl, eventFrom, eventTo, cmRoot, 0L);
+    setDump(lvl, eventFrom, eventTo, cmRoot, 0L, false);
   }
 
-  public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot, 
Long dumpExecutionId) {
+  public void setDump(DumpType lvl, Long eventFrom, Long eventTo, Path cmRoot, 
Long dumpExecutionId,
+                      boolean replScopeModified) {
     this.dumpType = lvl;
     this.eventFrom = eventFrom;
     this.eventTo = eventTo;
     this.cmRoot = cmRoot;
     this.initialized = true;
     this.dumpExecutionId = dumpExecutionId;
+    this.replScopeModified = replScopeModified;
   }
 
   public void setPayload(String payload) {
@@ -117,10 +120,10 @@ public class DumpMetaData {
       br = new BufferedReader(new InputStreamReader(fs.open(dumpFile)));
       String line;
       if ((line = br.readLine()) != null) {
-        String[] lineContents = line.split("\t", 6);
+        String[] lineContents = line.split("\t", 7);
         setDump(DumpType.valueOf(lineContents[0]), 
Long.valueOf(lineContents[1]),
             Long.valueOf(lineContents[2]),
-            new Path(lineContents[3]), Long.valueOf(lineContents[4]));
+            new Path(lineContents[3]), Long.valueOf(lineContents[4]), 
Boolean.valueOf(lineContents[6]));
         setPayload(lineContents[5].equals(Utilities.nullStringOutput) ? null : 
lineContents[5]);
       } else {
         throw new IOException(
@@ -165,6 +168,11 @@ public class DumpMetaData {
     return dumpExecutionId;
   }
 
+  public boolean isReplScopeModified() throws SemanticException {
+    initializeIfNot();
+    return replScopeModified;
+  }
+
   public ReplScope getReplScope() throws SemanticException {
     initializeIfNot();
     return replScope;
@@ -219,7 +227,8 @@ public class DumpMetaData {
             eventTo.toString(),
             cmRoot.toString(),
             dumpExecutionId.toString(),
-            payload)
+            payload,
+            String.valueOf(replScopeModified))
     );
     if (replScope != null) {
       listValues.add(prepareReplScopeValues());

Reply via email to