This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 7efa482 HIVE-21281: Repl checkpointing doesn't work when retry
bootstrap load with partitions of external tables (Sankar Hariappan, reviewed
by Mahesh Kumar Behera)
7efa482 is described below
commit 7efa48202d236064d6e7d24275600eaaf098ff8d
Author: Sankar Hariappan <[email protected]>
AuthorDate: Mon Feb 18 19:58:48 2019 +0530
HIVE-21281: Repl checkpointing doesn't work when retry bootstrap load with
partitions of external tables (Sankar Hariappan, reviewed by Mahesh Kumar
Behera)
Signed-off-by: Sankar Hariappan <[email protected]>
---
.../TestReplicationScenariosAcrossInstances.java | 19 +--------------
.../TestReplicationScenariosExternalTables.java | 6 +++++
.../hadoop/hive/ql/parse/WarehouseInstance.java | 27 ++++++++++++++++++++++
.../repl/bootstrap/load/table/LoadPartitions.java | 21 +++++++++--------
4 files changed, 46 insertions(+), 27 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 1adec4e..3639ab1 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -865,23 +865,6 @@ public class TestReplicationScenariosAcrossInstances
extends BaseReplicationAcro
assertFalse(fs.exists(cSerdesTableDumpLocation));
}
- private void verifyIfCkptSet(WarehouseInstance wh, String dbName, String
dumpDir) throws Exception {
- Database db = wh.getDatabase(replicatedDbName);
- verifyIfCkptSet(db.getParameters(), dumpDir);
-
- List<String> tblNames = wh.getAllTables(dbName);
- for (String tblName : tblNames) {
- Table tbl = wh.getTable(dbName, tblName);
- verifyIfCkptSet(tbl.getParameters(), dumpDir);
- if (tbl.getPartitionKeysSize() != 0) {
- List<Partition> partitions = wh.getAllPartitions(dbName, tblName);
- for (Partition ptn : partitions) {
- verifyIfCkptSet(ptn.getParameters(), dumpDir);
- }
- }
- }
- }
-
@Test
public void testShouldDumpMetaDataForNonNativeTableIfSetMeataDataOnly()
throws Throwable {
String tableName = testName.getMethodName() + "_table";
@@ -1166,7 +1149,7 @@ public class TestReplicationScenariosAcrossInstances
extends BaseReplicationAcro
.verifyResults(Collections.singletonList("10"))
.run("select country from t2 order by country")
.verifyResults(Arrays.asList("india", "uk", "us"));
- verifyIfCkptSet(replica, replicatedDbName, tuple.dumpLocation);
+ replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation);
WarehouseInstance.Tuple tuple_2 = primary
.run("use " + primaryDbName)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 2cdc35f..83f38fa 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -147,6 +147,9 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
.run("select country from t2 where country = 'france'")
.verifyResult("france");
+ // Ckpt should be set on bootstrapped db.
+ replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation);
+
assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName +
".t1");
assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName +
".t2");
@@ -511,6 +514,9 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
.run("show tables like 't4'")
.verifyResult("t4");
+ // Ckpt should be set on bootstrapped tables.
+ replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2",
"t3"), tuple.dumpLocation);
+
// Drop source tables to see if target points to correct data or not after
bootstrap load.
primary.run("use " + primaryDbName)
.run("drop table t2")
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index e0547d4..bd3a557 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -361,6 +362,32 @@ public class WarehouseInstance implements Closeable {
}
}
+ private void verifyIfCkptSet(Map<String, String> props, String dumpDir) {
+ assertTrue(props.containsKey(ReplUtils.REPL_CHECKPOINT_KEY));
+ assertTrue(props.get(ReplUtils.REPL_CHECKPOINT_KEY).equals(dumpDir));
+ }
+
+ public void verifyIfCkptSet(String dbName, String dumpDir) throws Exception {
+ Database db = getDatabase(dbName);
+ verifyIfCkptSet(db.getParameters(), dumpDir);
+
+ List<String> tblNames = getAllTables(dbName);
+ verifyIfCkptSetForTables(dbName, tblNames, dumpDir);
+ }
+
+ public void verifyIfCkptSetForTables(String dbName, List<String> tblNames,
String dumpDir) throws Exception {
+ for (String tblName : tblNames) {
+ Table tbl = getTable(dbName, tblName);
+ verifyIfCkptSet(tbl.getParameters(), dumpDir);
+ if (tbl.getPartitionKeysSize() != 0) {
+ List<Partition> partitions = getAllPartitions(dbName, tblName);
+ for (Partition ptn : partitions) {
+ verifyIfCkptSet(ptn.getParameters(), dumpDir);
+ }
+ }
+ }
+ }
+
public Database getDatabase(String dbName) throws Exception {
try {
return client.getDatabase(dbName);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 65b7aa0..fa72527 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -199,12 +199,22 @@ public class LoadPartitions {
context.hiveConf
);
+ Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
+ tableDesc,
+ (HashMap<String, String>)partSpec.getPartSpec(),
+ context.dumpDirectory,
+ context.hiveConf
+ );
+
boolean isOnlyDDLOperation = event.replicationSpec().isMetadataOnly()
|| (TableType.EXTERNAL_TABLE.equals(table.getTableType())
&& !event.replicationSpec().isMigratingToExternalTable()
);
if (isOnlyDDLOperation) {
+ // Set Checkpoint task as dependant to add partition tasks. So, if same
dump is retried for
+ // bootstrap, we skip current partition update.
+ addPartTask.addDependentTask(ckptTask);
if (ptnRootTask == null) {
ptnRootTask = addPartTask;
} else {
@@ -246,21 +256,14 @@ public class LoadPartitions {
movePartitionTask = movePartitionTask(table, partSpec, stagingDir,
loadFileType);
}
- // Set Checkpoint task as dependant to add partition tasks. So, if same
dump is retried for
- // bootstrap, we skip current partition update.
- Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
- tableDesc,
- (HashMap<String, String>)partSpec.getPartSpec(),
- context.dumpDirectory,
- context.hiveConf
- );
-
if (ptnRootTask == null) {
ptnRootTask = copyTask;
} else {
ptnRootTask.addDependentTask(copyTask);
}
+ // Set Checkpoint task as dependant to the tail of add partition tasks.
So, if same dump is
+ // retried for bootstrap, we skip current partition update.
copyTask.addDependentTask(addPartTask);
if (movePartitionTask != null) {
addPartTask.addDependentTask(movePartitionTask);