This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 7efa482  HIVE-21281: Repl checkpointing doesn't work when retry 
bootstrap load with partitions of external tables (Sankar Hariappan, reviewed 
by Mahesh Kumar Behera)
7efa482 is described below

commit 7efa48202d236064d6e7d24275600eaaf098ff8d
Author: Sankar Hariappan <[email protected]>
AuthorDate: Mon Feb 18 19:58:48 2019 +0530

    HIVE-21281: Repl checkpointing doesn't work when retry bootstrap load with 
partitions of external tables (Sankar Hariappan, reviewed by Mahesh Kumar 
Behera)
    
    Signed-off-by: Sankar Hariappan <[email protected]>
---
 .../TestReplicationScenariosAcrossInstances.java   | 19 +--------------
 .../TestReplicationScenariosExternalTables.java    |  6 +++++
 .../hadoop/hive/ql/parse/WarehouseInstance.java    | 27 ++++++++++++++++++++++
 .../repl/bootstrap/load/table/LoadPartitions.java  | 21 +++++++++--------
 4 files changed, 46 insertions(+), 27 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 1adec4e..3639ab1 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -865,23 +865,6 @@ public class TestReplicationScenariosAcrossInstances 
extends BaseReplicationAcro
     assertFalse(fs.exists(cSerdesTableDumpLocation));
   }
 
-  private void verifyIfCkptSet(WarehouseInstance wh, String dbName, String 
dumpDir) throws Exception {
-    Database db = wh.getDatabase(replicatedDbName);
-    verifyIfCkptSet(db.getParameters(), dumpDir);
-
-    List<String> tblNames = wh.getAllTables(dbName);
-    for (String tblName : tblNames) {
-      Table tbl = wh.getTable(dbName, tblName);
-      verifyIfCkptSet(tbl.getParameters(), dumpDir);
-      if (tbl.getPartitionKeysSize() != 0) {
-        List<Partition> partitions = wh.getAllPartitions(dbName, tblName);
-        for (Partition ptn : partitions) {
-          verifyIfCkptSet(ptn.getParameters(), dumpDir);
-        }
-      }
-    }
-  }
-
   @Test
   public void testShouldDumpMetaDataForNonNativeTableIfSetMeataDataOnly() 
throws Throwable {
     String tableName = testName.getMethodName() + "_table";
@@ -1166,7 +1149,7 @@ public class TestReplicationScenariosAcrossInstances 
extends BaseReplicationAcro
         .verifyResults(Collections.singletonList("10"))
             .run("select country from t2 order by country")
             .verifyResults(Arrays.asList("india", "uk", "us"));
-    verifyIfCkptSet(replica, replicatedDbName, tuple.dumpLocation);
+    replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation);
 
     WarehouseInstance.Tuple tuple_2 = primary
             .run("use " + primaryDbName)
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 2cdc35f..83f38fa 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -147,6 +147,9 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
         .run("select country from t2 where country = 'france'")
         .verifyResult("france");
 
+    // Ckpt should be set on bootstrapped db.
+    replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation);
+
     assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + 
".t1");
     assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + 
".t2");
 
@@ -511,6 +514,9 @@ public class TestReplicationScenariosExternalTables extends 
BaseReplicationAcros
             .run("show tables like 't4'")
             .verifyResult("t4");
 
+    // Ckpt should be set on bootstrapped tables.
+    replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2", 
"t3"), tuple.dumpLocation);
+
     // Drop source tables to see if target points to correct data or not after 
bootstrap load.
     primary.run("use " + primaryDbName)
             .run("drop table t2")
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index e0547d4..bd3a557 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -361,6 +362,32 @@ public class WarehouseInstance implements Closeable {
     }
   }
 
+  private void verifyIfCkptSet(Map<String, String> props, String dumpDir) {
+    assertTrue(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY));
+    assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir));
+  }
+
+  public void verifyIfCkptSet(String dbName, String dumpDir) throws Exception {
+    Database db = getDatabase(dbName);
+    verifyIfCkptSet(db.getParameters(), dumpDir);
+
+    List<String> tblNames = getAllTables(dbName);
+    verifyIfCkptSetForTables(dbName, tblNames, dumpDir);
+  }
+
+  public void verifyIfCkptSetForTables(String dbName, List<String> tblNames, 
String dumpDir) throws Exception {
+    for (String tblName : tblNames) {
+      Table tbl = getTable(dbName, tblName);
+      verifyIfCkptSet(tbl.getParameters(), dumpDir);
+      if (tbl.getPartitionKeysSize() != 0) {
+        List<Partition> partitions = getAllPartitions(dbName, tblName);
+        for (Partition ptn : partitions) {
+          verifyIfCkptSet(ptn.getParameters(), dumpDir);
+        }
+      }
+    }
+  }
+
   public Database getDatabase(String dbName) throws Exception {
     try {
       return client.getDatabase(dbName);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 65b7aa0..fa72527 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -199,12 +199,22 @@ public class LoadPartitions {
             context.hiveConf
     );
 
+    Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
+            tableDesc,
+            (HashMap<String, String>)partSpec.getPartSpec(),
+            context.dumpDirectory,
+            context.hiveConf
+    );
+
     boolean isOnlyDDLOperation = event.replicationSpec().isMetadataOnly()
         || (TableType.EXTERNAL_TABLE.equals(table.getTableType())
         && !event.replicationSpec().isMigratingToExternalTable()
     );
 
     if (isOnlyDDLOperation) {
+      // Set Checkpoint task as dependant to add partition tasks. So, if same 
dump is retried for
+      // bootstrap, we skip current partition update.
+      addPartTask.addDependentTask(ckptTask);
       if (ptnRootTask == null) {
         ptnRootTask = addPartTask;
       } else {
@@ -246,21 +256,14 @@ public class LoadPartitions {
       movePartitionTask = movePartitionTask(table, partSpec, stagingDir, 
loadFileType);
     }
 
-    // Set Checkpoint task as dependant to add partition tasks. So, if same 
dump is retried for
-    // bootstrap, we skip current partition update.
-    Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
-            tableDesc,
-            (HashMap<String, String>)partSpec.getPartSpec(),
-            context.dumpDirectory,
-            context.hiveConf
-    );
-
     if (ptnRootTask == null) {
       ptnRootTask = copyTask;
     } else {
       ptnRootTask.addDependentTask(copyTask);
     }
 
+    // Set Checkpoint task as dependant to the tail of add partition tasks. 
So, if same dump is
+    // retried for bootstrap, we skip current partition update.
     copyTask.addDependentTask(addPartTask);
     if (movePartitionTask != null) {
       addPartTask.addDependentTask(movePartitionTask);

Reply via email to