HIVE-19435: Incremental replication cause data loss if a table is dropped 
followed by create and insert-into with different partition type (Sankar 
Hariappan, reviewed by Mahesh Kumar Behera, Thejas M Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/91b66c5c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/91b66c5c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/91b66c5c

Branch: refs/heads/branch-3.0.0
Commit: 91b66c5c5a6106d42903385b7a49e6b32400cc39
Parents: 11a7164
Author: Sankar Hariappan <sank...@apache.org>
Authored: Sun May 13 13:02:25 2018 +0530
Committer: Sankar Hariappan <sank...@apache.org>
Committed: Sun May 13 13:02:25 2018 +0530

----------------------------------------------------------------------
 ...TestReplicationScenariosAcrossInstances.java | 58 ++++++++++++++++++++
 .../hive/ql/parse/ImportSemanticAnalyzer.java   | 38 ++++++++++---
 2 files changed, 88 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/91b66c5c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 70e1aa7..df9bde0 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -675,4 +675,62 @@ public class TestReplicationScenariosAcrossInstances {
             .run("select id from table2 order by id")
             .verifyResults(new String[] {"2"});
   }
+
+  @Test
+  public void 
testIncrementalReplWithDropAndCreateTableDifferentPartitionTypeAndInsert() 
throws Throwable {
+    // Bootstrap dump with empty db
+    WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null);
+
+    // Bootstrap load in replica
+    replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+            .status(replicatedDbName)
+            .verifyResult(bootstrapTuple.lastReplicationId);
+
+    // First incremental dump
+    WarehouseInstance.Tuple firstIncremental = primary.run("use " + 
primaryDbName)
+            .run("create table table1 (id int) partitioned by (country 
string)")
+            .run("create table table2 (id int)")
+            .run("create table table3 (id int) partitioned by (country 
string)")
+            .run("insert into table1 partition(country='india') values(1)")
+            .run("insert into table2 values(2)")
+            .run("insert into table3 partition(country='india') values(3)")
+            .dump(primaryDbName, bootstrapTuple.lastReplicationId);
+
+    // Second incremental dump
+    WarehouseInstance.Tuple secondIncremental = primary.run("use " + 
primaryDbName)
+            .run("drop table table1")
+            .run("drop table table2")
+            .run("drop table table3")
+            .run("create table table1 (id int)")
+            .run("insert into table1 values (10)")
+            .run("create table table2 (id int) partitioned by (country 
string)")
+            .run("insert into table2 partition(country='india') values(20)")
+            .run("create table table3 (id int) partitioned by (name string, 
rank int)")
+            .run("insert into table3 partition(name='adam', rank=100) 
values(30)")
+            .dump(primaryDbName, firstIncremental.lastReplicationId);
+
+    // First incremental load
+    replica.load(replicatedDbName, firstIncremental.dumpLocation)
+            .status(replicatedDbName)
+            .verifyResult(firstIncremental.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("select id from table1")
+            .verifyResults(new String[] {"1"})
+            .run("select * from table2")
+            .verifyResults(new String[] {"2"})
+            .run("select id from table3")
+            .verifyResults(new String[] {"3"});
+
+    // Second incremental load
+    replica.load(replicatedDbName, secondIncremental.dumpLocation)
+            .status(replicatedDbName)
+            .verifyResult(secondIncremental.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("select * from table1")
+            .verifyResults(new String[] {"10"})
+            .run("select id from table2")
+            .verifyResults(new String[] {"20"})
+            .run("select id from table3")
+            .verifyResults(new String[] {"30"});
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/91b66c5c/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index e6a7012..eb6708b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -898,14 +898,14 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     }
   }
 
-  private static Table createNewTableMetadataObject(ImportTableDesc tblDesk)
+  private static Table createNewTableMetadataObject(ImportTableDesc tblDesc)
       throws SemanticException {
-    Table newTable = new Table(tblDesk.getDatabaseName(), 
tblDesk.getTableName());
+    Table newTable = new Table(tblDesc.getDatabaseName(), 
tblDesc.getTableName());
     //so that we know the type of table we are creating: acid/MM to match what 
was exported
-    newTable.setParameters(tblDesk.getTblProps());
-    if(tblDesk.isExternal() && AcidUtils.isTransactionalTable(newTable)) {
+    newTable.setParameters(tblDesc.getTblProps());
+    if(tblDesc.isExternal() && AcidUtils.isTransactionalTable(newTable)) {
       throw new SemanticException("External tables may not be transactional: " 
+
-          Warehouse.getQualifiedName(tblDesk.getDatabaseName(), 
tblDesk.getTableName()));
+          Warehouse.getQualifiedName(tblDesc.getDatabaseName(), 
tblDesc.getTableName()));
     }
     return newTable;
   }
@@ -1027,14 +1027,36 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         x.getTasks().add(t);
       }
     } else {
+      // If table of current event has partition flag different from existing 
table, it means, some
+      // of the previous events in same batch have drop and create table 
events with same same but
+      // different partition flag. In this case, should go with current 
event's table type and so
+      // create the dummy table object for adding repl tasks.
+      boolean isOldTableValid = true;
+      if (table.isPartitioned() != isPartitioned(tblDesc)) {
+        table = createNewTableMetadataObject(tblDesc);
+        isOldTableValid = false;
+      }
+
       // Table existed, and is okay to replicate into, not dropping and 
re-creating.
-      if (table.isPartitioned()) {
+      if (isPartitioned(tblDesc)) {
         x.getLOG().debug("table partitioned");
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
           addPartitionDesc.setReplicationSpec(replicationSpec);
           Map<String, String> partSpec = 
addPartitionDesc.getPartition(0).getPartSpec();
           org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
-          if ((ptn = x.getHive().getPartition(table, partSpec, false)) == 
null) {
+          if (isOldTableValid) {
+            // If existing table is valid but the partition spec is different, 
then ignore partition
+            // validation and create new partition.
+            try {
+              ptn = x.getHive().getPartition(table, partSpec, false);
+            } catch (HiveException ex) {
+              ptn = null;
+              table = createNewTableMetadataObject(tblDesc);
+              isOldTableValid = false;
+            }
+          }
+
+          if (ptn == null) {
             if (!replicationSpec.isMetadataOnly()){
               x.getTasks().add(addSinglePartition(
                   fromURI, fs, tblDesc, table, wh, addPartitionDesc, 
replicationSpec, x, writeId, stmtId, isSourceMm));
@@ -1079,7 +1101,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         x.getLOG().debug("table non-partitioned");
         if (!replicationSpec.isMetadataOnly()) {
           // repl-imports are replace-into unless the event is insert-into
-          loadTable(fromURI, table, replicationSpec.isReplace(), 
table.getDataLocation(),
+          loadTable(fromURI, table, replicationSpec.isReplace(), new 
Path(tblDesc.getLocation()),
             replicationSpec, x, writeId, stmtId, isSourceMm);
         } else {
           x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));

Reply via email to