Repository: hive
Updated Branches:
  refs/heads/master 244ca8e5c -> d9fae0493


HIVE-19130: NPE is thrown when REPL LOAD applied drop partition event (Sankar 
Hariappan, reviewed by Mahesh Kumar Behera, Thejas M Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d9fae049
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d9fae049
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d9fae049

Branch: refs/heads/master
Commit: d9fae049305e20ec8a72e581a2fc938028523402
Parents: 244ca8e
Author: Sankar Hariappan <sank...@apache.org>
Authored: Thu Apr 12 10:29:47 2018 +0530
Committer: Sankar Hariappan <sank...@apache.org>
Committed: Thu Apr 12 10:29:47 2018 +0530

----------------------------------------------------------------------
 ...TestReplicationScenariosAcrossInstances.java | 55 ++++++++++++++++++++
 .../hadoop/hive/ql/parse/WarehouseInstance.java |  5 ++
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |  8 ++-
 .../apache/hadoop/hive/ql/parse/EximUtil.java   | 10 ++++
 .../hive/ql/parse/ImportSemanticAnalyzer.java   | 31 +++++++++--
 .../parse/repl/load/message/TableHandler.java   |  1 +
 6 files changed, 106 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d9fae049/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 689ca76..70e1aa7 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -620,4 +620,59 @@ public class TestReplicationScenariosAcrossInstances {
             .run("show functions like '" + replicatedDbName + "*'")
             .verifyResult(null);
   }
+
+  @Test
+  public void testIncrementalReplWithEventsBatchHavingDropCreateTable() throws 
Throwable {
+    // Bootstrap dump with empty db
+    WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null);
+
+    // Bootstrap load in replica
+    replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+            .status(replicatedDbName)
+            .verifyResult(bootstrapTuple.lastReplicationId);
+
+    // First incremental dump
+    WarehouseInstance.Tuple firstIncremental = primary.run("use " + 
primaryDbName)
+            .run("create table table1 (i int)")
+            .run("create table table2 (id int) partitioned by (country 
string)")
+            .run("insert into table1 values (1)")
+            .run("insert into table2 partition(country='india') values(1)")
+            .dump(primaryDbName, bootstrapTuple.lastReplicationId);
+
+    // Second incremental dump
+    WarehouseInstance.Tuple secondIncremental = primary.run("use " + 
primaryDbName)
+            .run("drop table table1")
+            .run("drop table table2")
+            .run("create table table2 (id int) partitioned by (country 
string)")
+            .run("alter table table2 add partition(country='india')")
+            .run("alter table table2 drop partition(country='india')")
+            .run("insert into table2 partition(country='us') values(2)")
+            .run("create table table1 (i int)")
+            .run("insert into table1 values (2)")
+            .dump(primaryDbName, firstIncremental.lastReplicationId);
+
+    // First incremental load
+    replica.load(replicatedDbName, firstIncremental.dumpLocation)
+            .status(replicatedDbName)
+            .verifyResult(firstIncremental.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"table1", "table2"})
+            .run("select * from table1")
+            .verifyResults(new String[] {"1"})
+            .run("select id from table2 order by id")
+            .verifyResults(new String[] {"1"});
+
+    // Second incremental load
+    replica.load(replicatedDbName, secondIncremental.dumpLocation)
+            .status(replicatedDbName)
+            .verifyResult(secondIncremental.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"table1", "table2"})
+            .run("select * from table1")
+            .verifyResults(new String[] {"2"})
+            .run("select id from table2 order by id")
+            .verifyResults(new String[] {"2"});
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d9fae049/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 7c8020d..accdc1f 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -231,6 +231,11 @@ public class WarehouseInstance implements Closeable {
     return run(replLoadCmd);
   }
 
+  WarehouseInstance status(String replicatedDbName) throws Throwable {
+    String replStatusCmd = "REPL STATUS " + replicatedDbName;
+    return run(replStatusCmd);
+  }
+
   WarehouseInstance status(String replicatedDbName, List<String> 
withClauseOptions) throws Throwable {
     String replStatusCmd = "REPL STATUS " + replicatedDbName;
     if (!withClauseOptions.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d9fae049/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index bda2af3..61a0432 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4522,6 +4522,12 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
       // for dropping. Thus, we need a way to push this filter 
(replicationSpec.allowEventReplacementInto)
       // to the  metastore to allow it to do drop a partition or not, 
depending on a Predicate on the
       // parameter key values.
+
+      if (tbl == null) {
+        // If table is missing, then partitions are also would've been 
dropped. Just no-op.
+        return;
+      }
+
       for (DropTableDesc.PartSpec partSpec : dropTbl.getPartSpecs()){
         List<Partition> partitions = new ArrayList<>();
         try {
@@ -4551,7 +4557,7 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
       console.printInfo("Dropped the partition " + partition.getName());
       // We have already locked the table, don't lock the partitions.
       addIfAbsentByName(new WriteEntity(partition, 
WriteEntity.WriteType.DDL_NO_LOCK));
-    };
+    }
   }
 
   private void dropTable(Hive db, Table tbl, DropTableDesc dropTbl) throws 
HiveException {

http://git-wip-us.apache.org/repos/asf/hive/blob/d9fae049/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 89837be..0d2fafb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.DBSerializer;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
@@ -92,6 +93,7 @@ public class EximUtil {
     private List<Task<? extends Serializable>> tasks;
     private Logger LOG;
     private Context ctx;
+    private DumpType eventType = DumpType.EVENT_UNKNOWN;
 
     public HiveConf getConf() {
       return conf;
@@ -121,6 +123,14 @@ public class EximUtil {
       return ctx;
     }
 
+    public void setEventType(DumpType eventType) {
+      this.eventType = eventType;
+    }
+
+    public DumpType getEventType() {
+      return eventType;
+    }
+
     public SemanticAnalyzerWrapperContext(HiveConf conf, Hive db,
                                           HashSet<ReadEntity> inputs,
                                           HashSet<WriteEntity> outputs,

http://git-wip-us.apache.org/repos/asf/hive/blob/d9fae049/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 8b639f7..832f660 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -47,12 +47,14 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
 import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.CopyWork;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DropTableDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
@@ -438,6 +440,13 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     return tableDesc.getCreateTableTask(x.getInputs(), x.getOutputs(), 
x.getConf());
   }
 
+  private static Task<?> dropTableTask(Table table, 
EximUtil.SemanticAnalyzerWrapperContext x,
+                                       ReplicationSpec replicationSpec) {
+    DropTableDesc dropTblDesc = new DropTableDesc(table.getTableName(), 
table.getTableType(),
+            true, false, replicationSpec);
+    return TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), 
dropTblDesc), x.getConf());
+  }
+
   private static Task<? extends Serializable> alterTableTask(ImportTableDesc 
tableDesc,
       EximUtil.SemanticAnalyzerWrapperContext x, ReplicationSpec 
replicationSpec) {
     tableDesc.setReplaceMode(true);
@@ -912,7 +921,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       UpdatedMetaDataTracker updatedMetadata)
       throws HiveException, URISyntaxException, IOException, MetaException {
 
-    Task<?> dr = null;
+    Task<?> dropTblTask = null;
     WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
 
     // Normally, on import, trying to create a table or a partition in a db 
that does not yet exist
@@ -934,6 +943,15 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
                 tblDesc.getDatabaseName(), tblDesc.getTableName());
         return;
       }
+
+      // If the table exists and we found a valid create table event, then 
need to drop the table first
+      // and then create it. This case is possible if the event sequence is 
drop_table(t1) -> create_table(t1).
+      // We need to drop here to handle the case where the previous 
incremental load created the table but
+      // didn't set the last repl ID due to some failure.
+      if (x.getEventType() == DumpType.EVENT_CREATE_TABLE) {
+        dropTblTask = dropTableTask(table, x, replicationSpec);
+        table = null;
+      }
     } else {
       // If table doesn't exist, allow creating a new one only if the database 
state is older than the update.
       if ((parentDb != null) && 
(!replicationSpec.allowReplacementInto(parentDb.getParameters()))) {
@@ -1000,8 +1018,15 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           t.addDependentTask(loadTable(fromURI, table, true, new 
Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId, isSourceMm));
         }
       }
-      // Simply create
-      x.getTasks().add(t);
+
+      if (dropTblTask != null) {
+        // Drop first and then create
+        dropTblTask.addDependentTask(t);
+        x.getTasks().add(dropTblTask);
+      } else {
+        // Simply create
+        x.getTasks().add(t);
+      }
     } else {
       // Table existed, and is okay to replicate into, not dropping and 
re-creating.
       if (table.isPartitioned()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d9fae049/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
index 4cd75d8..7f6e80a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/TableHandler.java
@@ -36,6 +36,7 @@ public class TableHandler extends AbstractMessageHandler {
           new EximUtil.SemanticAnalyzerWrapperContext(
               context.hiveConf, context.db, readEntitySet, writeEntitySet, 
importTasks, context.log,
               context.nestedContext);
+      x.setEventType(context.dmd.getDumpType());
 
       // REPL LOAD is not partition level. It is always DB or table level. So, 
passing null for partition specs.
       // Also, REPL LOAD doesn't support external table and hence no location 
set as well.

Reply via email to