HIVE-19829: Incremental replication load should create tasks in execution phase 
rather than semantic phase (Mahesh Kumar Behera, reviewed by Sankar Hariappan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/150ef3ba
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/150ef3ba
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/150ef3ba

Branch: refs/heads/branch-3
Commit: 150ef3ba55edfcac26eeaa0574e95a2498096798
Parents: c191ea5
Author: Sankar Hariappan <[email protected]>
Authored: Sat Jul 28 22:35:43 2018 +0530
Committer: Sankar Hariappan <[email protected]>
Committed: Sat Jul 28 22:35:43 2018 +0530

----------------------------------------------------------------------
 .../TestReplicationScenariosAcidTables.java     |  12 +-
 ...TestReplicationScenariosAcrossInstances.java |  77 ++++-
 ql/if/queryplan.thrift                          |   3 +-
 ql/src/gen/thrift/gen-cpp/queryplan_types.cpp   |   8 +-
 ql/src/gen/thrift/gen-cpp/queryplan_types.h     |   3 +-
 .../hadoop/hive/ql/plan/api/StageType.java      |   5 +-
 ql/src/gen/thrift/gen-php/Types.php             |   2 +
 ql/src/gen/thrift/gen-py/queryplan/ttypes.py    |   3 +
 ql/src/gen/thrift/gen-rb/queryplan_types.rb     |   5 +-
 .../apache/hadoop/hive/ql/exec/TaskFactory.java |   4 +-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java  | 344 +++++++++++++++++++
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java  | 113 ++++++
 .../hadoop/hive/ql/exec/repl/ReplUtils.java     | 122 -------
 .../repl/bootstrap/AddDependencyToLeaves.java   |  51 ---
 .../ql/exec/repl/bootstrap/ReplLoadTask.java    | 319 -----------------
 .../ql/exec/repl/bootstrap/ReplLoadWork.java    |  88 -----
 .../filesystem/BootstrapEventsIterator.java     |   9 +
 .../repl/bootstrap/load/LoadConstraint.java     |   1 +
 .../exec/repl/bootstrap/load/LoadDatabase.java  |   5 +-
 .../exec/repl/bootstrap/load/LoadFunction.java  |   3 +-
 .../exec/repl/bootstrap/load/TaskTracker.java   | 135 --------
 .../bootstrap/load/table/LoadPartitions.java    |   6 +-
 .../repl/bootstrap/load/table/LoadTable.java    |   6 +-
 .../repl/bootstrap/load/table/TableContext.java |   2 +-
 .../IncrementalLoadEventsIterator.java          |  73 ++++
 .../IncrementalLoadTasksBuilder.java            | 320 +++++++++++++++++
 .../exec/repl/util/AddDependencyToLeaves.java   |  51 +++
 .../hive/ql/exec/repl/util/ReplUtils.java       | 123 +++++++
 .../hive/ql/exec/repl/util/TaskTracker.java     | 145 ++++++++
 .../ql/optimizer/QueryPlanPostProcessor.java    |   2 +-
 .../apache/hadoop/hive/ql/parse/EximUtil.java   |   2 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 295 +---------------
 .../parse/repl/dump/io/PartitionSerializer.java |   2 +-
 .../ql/parse/repl/dump/io/TableSerializer.java  |   2 +-
 .../repl/load/message/AlterDatabaseHandler.java |   2 +-
 .../repl/load/message/CommitTxnHandler.java     |   2 +-
 .../repl/load/message/DropPartitionHandler.java |   2 +-
 .../bootstrap/AddDependencyToLeavesTest.java    |   1 +
 .../repl/bootstrap/load/TestTaskTracker.java    |   1 +
 39 files changed, 1291 insertions(+), 1058 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index f1c0100..b523d9f 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -444,7 +444,7 @@ public class TestReplicationScenariosAcidTables {
 
     WarehouseInstance.Tuple incrementalDump =
             primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
-    replicaNonAcid.loadFailure(replicatedDbName, incrementalDump.dumpLocation)
+    replicaNonAcid.runFailure("REPL LOAD " + replicatedDbName + " FROM '" + 
incrementalDump.dumpLocation + "'")
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
   }
@@ -494,10 +494,8 @@ public class TestReplicationScenariosAcidTables {
     replica.run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
             .verifyResult("null")
-            .run("show tables")
-            .verifyResults(new String[] { "t1" })
-            .run("select id from t1")
-            .verifyResults(Arrays.asList("1"));
+            .run("show tables like t2")
+            .verifyResults(new String[] { });
 
     // Retry with different dump should fail.
     replica.loadFailure(replicatedDbName, tuple2.dumpLocation);
@@ -512,10 +510,6 @@ public class TestReplicationScenariosAcidTables {
           LOG.warn("Verifier - DB: " + String.valueOf(args.dbName));
           return false;
         }
-        if (args.tblName != null) {
-          LOG.warn("Verifier - Table: " + String.valueOf(args.tblName));
-          return args.tblName.equals("t2");
-        }
         return true;
       }
     };

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 7c043b9..b2e1e25 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.ql.util.DependencyResolver;
 import org.apache.hadoop.hive.shims.Utils;
@@ -45,6 +45,8 @@ import org.junit.rules.TestName;
 import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import 
org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
+import org.junit.Assert;
 
 import java.io.IOException;
 import java.net.URI;
@@ -80,10 +82,11 @@ public class TestReplicationScenariosAcrossInstances {
   protected static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationScenarios.class);
   private static WarehouseInstance primary, replica;
   private String primaryDbName, replicatedDbName;
+  private static HiveConf conf;
 
   @BeforeClass
   public static void classLevelSetup() throws Exception {
-    Configuration conf = new Configuration();
+    conf = new HiveConf(TestReplicationScenarios.class);
     conf.set("dfs.client.use.datanode.hostname", "true");
     conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + 
".hosts", "*");
     MiniDFSCluster miniDFSCluster =
@@ -874,6 +877,56 @@ public class TestReplicationScenariosAcrossInstances {
   }
 
   @Test
+  public void testIncrementalDumpMultiIteration() throws Throwable {
+    WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null);
+
+    replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+            .status(replicatedDbName)
+            .verifyResult(bootstrapTuple.lastReplicationId);
+
+    WarehouseInstance.Tuple incremental = primary.run("use " + primaryDbName)
+            .run("create table table1 (id int) partitioned by (country 
string)")
+            .run("create table table2 (id int)")
+            .run("create table table3 (id int) partitioned by (country 
string)")
+            .run("insert into table1 partition(country='india') values(1)")
+            .run("insert into table2 values(2)")
+            .run("insert into table3 partition(country='india') values(3)")
+            .dump(primaryDbName, bootstrapTuple.lastReplicationId);
+
+    replica.load(replicatedDbName, incremental.dumpLocation, 
Arrays.asList("'hive.repl.approx.max.load.tasks'='10'"))
+            .status(replicatedDbName)
+            .verifyResult(incremental.lastReplicationId)
+            .run("use " + replicatedDbName)
+            .run("select id from table1")
+            .verifyResults(new String[] {"1" })
+            .run("select * from table2")
+            .verifyResults(new String[] {"2" })
+            .run("select id from table3")
+            .verifyResults(new String[] {"3" });
+    assert(IncrementalLoadTasksBuilder.getNumIteration() > 1);
+
+    incremental = primary.run("use " + primaryDbName)
+            .run("create table  table5 (key int, value int) partitioned by 
(load_date date) " +
+                    "clustered by(key) into 2 buckets stored as orc")
+            .run("create table table4 (i int, j int)")
+            .run("insert into table4 values (1,2)")
+            .dump(primaryDbName, incremental.lastReplicationId);
+
+    Path path = new Path(incremental.dumpLocation);
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus[] fileStatus = fs.listStatus(path);
+    int numEvents = fileStatus.length - 1; //one is metadata file
+
+    replica.load(replicatedDbName, incremental.dumpLocation, 
Arrays.asList("'hive.repl.approx.max.load.tasks'='1'"))
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"table1", "table2", "table3", 
"table4", "table5" })
+            .run("select i from table4")
+            .verifyResult("1");
+    Assert.assertEquals(IncrementalLoadTasksBuilder.getNumIteration(), 
numEvents);
+  }
+
+  @Test
   public void testIfCkptAndSourceOfReplPropsIgnoredByReplDump() throws 
Throwable {
     WarehouseInstance.Tuple tuplePrimary = primary
             .run("use " + primaryDbName)
@@ -1088,9 +1141,7 @@ public class TestReplicationScenariosAcrossInstances {
 
     replica.run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
-            .verifyResult("null")
-            .run("show tables")
-            .verifyResults(new String[] { "t1" });
+            .verifyResult("null");
     assertEquals(0, replica.getPrimaryKeyList(replicatedDbName, "t1").size());
     assertEquals(0, replica.getUniqueConstraintList(replicatedDbName, 
"t3").size());
     assertEquals(0, replica.getNotNullConstraintList(replicatedDbName, 
"t3").size());
@@ -1112,10 +1163,6 @@ public class TestReplicationScenariosAcrossInstances {
           LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) + " Func: " 
+ String.valueOf(args.funcName));
           return false;
         }
-        if (args.tblName != null) {
-          LOG.warn("Verifier - Table: " + String.valueOf(args.tblName));
-          return (args.tblName.equals("t2") || args.tblName.equals("t3"));
-        }
         if (args.constraintTblName != null) {
           LOG.warn("Verifier - Constraint Table: " + 
String.valueOf(args.constraintTblName));
           return (args.constraintTblName.equals("t1") || 
args.constraintTblName.equals("t3"));
@@ -1188,8 +1235,6 @@ public class TestReplicationScenariosAcrossInstances {
   public void testBootstrapReplLoadRetryAfterFailureForPartitions() throws 
Throwable {
     WarehouseInstance.Tuple tuple = primary
             .run("use " + primaryDbName)
-            .run("create table t1 (id int)")
-            .run("insert into table t1 values (10)")
             .run("create table t2 (place string) partitioned by (country 
string)")
             .run("insert into table t2 partition(country='india') values 
('bangalore')")
             .run("insert into table t2 partition(country='uk') values 
('london')")
@@ -1204,7 +1249,7 @@ public class TestReplicationScenariosAcrossInstances {
             .dump(primaryDbName, null);
 
     // Inject a behavior where REPL LOAD failed when try to load table "t2" 
and partition "uk".
-    // So, table "t1" and "t2" will exist and partition "india" will exist, 
rest failed as operation failed.
+    // So, table "t2" will exist and partition "india" will exist, rest failed 
as operation failed.
     BehaviourInjection<Partition, Partition> getPartitionStub
             = new BehaviourInjection<Partition, Partition>() {
       @Nullable
@@ -1229,9 +1274,7 @@ public class TestReplicationScenariosAcrossInstances {
             .run("repl status " + replicatedDbName)
             .verifyResult("null")
             .run("show tables")
-            .verifyResults(new String[] { "t1", "t2" })
-            .run("select id from t1")
-            .verifyResults(Arrays.asList("10"))
+            .verifyResults(new String[] {"t2" })
             .run("select country from t2 order by country")
             .verifyResults(Arrays.asList("india"))
             .run("show functions like '" + replicatedDbName + "*'")
@@ -1271,9 +1314,7 @@ public class TestReplicationScenariosAcrossInstances {
             .run("repl status " + replicatedDbName)
             .verifyResult(tuple.lastReplicationId)
             .run("show tables")
-            .verifyResults(new String[] { "t1", "t2" })
-            .run("select id from t1")
-            .verifyResults(Arrays.asList("10"))
+            .verifyResults(new String[] { "t2" })
             .run("select country from t2 order by country")
             .verifyResults(Arrays.asList("india", "uk", "us"))
             .run("show functions like '" + replicatedDbName + "*'")

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/if/queryplan.thrift
----------------------------------------------------------------------
diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift
index d0ba704..697b5f5 100644
--- a/ql/if/queryplan.thrift
+++ b/ql/if/queryplan.thrift
@@ -104,7 +104,8 @@ enum StageType {
   REPL_DUMP,
   REPL_BOOTSTRAP_LOAD,
   REPL_STATE_LOG,
-  REPL_TXN
+  REPL_TXN,
+  REPL_INCREMENTAL_LOAD
 }
 
 struct Stage {

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp 
b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
index e28ac4c..fd04675 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
@@ -121,7 +121,8 @@ int _kStageTypeValues[] = {
   StageType::REPL_DUMP,
   StageType::REPL_BOOTSTRAP_LOAD,
   StageType::REPL_STATE_LOG,
-  StageType::REPL_TXN
+  StageType::REPL_TXN,
+  StageType::REPL_INCREMENTAL_LOAD
 };
 const char* _kStageTypeNames[] = {
   "CONDITIONAL",
@@ -139,9 +140,10 @@ const char* _kStageTypeNames[] = {
   "REPL_DUMP",
   "REPL_BOOTSTRAP_LOAD",
   "REPL_STATE_LOG",
-  "REPL_TXN"
+  "REPL_TXN",
+  "REPL_INCREMENTAL_LOAD"
 };
-const std::map<int, const char*> 
_StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(16, 
_kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, 
NULL));
+const std::map<int, const char*> 
_StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(17, 
_kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, 
NULL));
 
 
 Adjacency::~Adjacency() throw() {

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/gen/thrift/gen-cpp/queryplan_types.h
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h 
b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
index 6bdea4b..8bd11c0 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
@@ -98,7 +98,8 @@ struct StageType {
     REPL_DUMP = 12,
     REPL_BOOTSTRAP_LOAD = 13,
     REPL_STATE_LOG = 14,
-    REPL_TXN = 15
+    REPL_TXN = 15,
+    REPL_INCREMENTAL_LOAD = 16
   };
 };
 

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
----------------------------------------------------------------------
diff --git 
a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
 
b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
index 08822b3..7eebe28 100644
--- 
a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
+++ 
b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
@@ -27,7 +27,8 @@ public enum StageType implements org.apache.thrift.TEnum {
   REPL_DUMP(12),
   REPL_BOOTSTRAP_LOAD(13),
   REPL_STATE_LOG(14),
-  REPL_TXN(15);
+  REPL_TXN(15),
+  REPL_INCREMENTAL_LOAD(16);
 
   private final int value;
 
@@ -80,6 +81,8 @@ public enum StageType implements org.apache.thrift.TEnum {
         return REPL_STATE_LOG;
       case 15:
         return REPL_TXN;
+      case 16:
+        return REPL_INCREMENTAL_LOAD;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/gen/thrift/gen-php/Types.php
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-php/Types.php 
b/ql/src/gen/thrift/gen-php/Types.php
index 4ceec88..937dad2 100644
--- a/ql/src/gen/thrift/gen-php/Types.php
+++ b/ql/src/gen/thrift/gen-php/Types.php
@@ -120,6 +120,7 @@ final class StageType {
   const REPL_BOOTSTRAP_LOAD = 13;
   const REPL_STATE_LOG = 14;
   const REPL_TXN = 15;
+  const REPL_INCREMENTAL_LOAD = 16;
   static public $__names = array(
     0 => 'CONDITIONAL',
     1 => 'COPY',
@@ -137,6 +138,7 @@ final class StageType {
     13 => 'REPL_BOOTSTRAP_LOAD',
     14 => 'REPL_STATE_LOG',
     15 => 'REPL_TXN',
+    16 => 'REPL_INCREMENTAL_LOAD',
   );
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py 
b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
index 5638d35..f61f27b 100644
--- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
+++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
@@ -167,6 +167,7 @@ class StageType:
   REPL_BOOTSTRAP_LOAD = 13
   REPL_STATE_LOG = 14
   REPL_TXN = 15
+  REPL_INCREMENTAL_LOAD = 16
 
   _VALUES_TO_NAMES = {
     0: "CONDITIONAL",
@@ -185,6 +186,7 @@ class StageType:
     13: "REPL_BOOTSTRAP_LOAD",
     14: "REPL_STATE_LOG",
     15: "REPL_TXN",
+    16: "REPL_INCREMENTAL_LOAD",
   }
 
   _NAMES_TO_VALUES = {
@@ -204,6 +206,7 @@ class StageType:
     "REPL_BOOTSTRAP_LOAD": 13,
     "REPL_STATE_LOG": 14,
     "REPL_TXN": 15,
+    "REPL_INCREMENTAL_LOAD": 16,
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/gen/thrift/gen-rb/queryplan_types.rb
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb 
b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index 04af975..2867df3 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -77,8 +77,9 @@ module StageType
   REPL_BOOTSTRAP_LOAD = 13
   REPL_STATE_LOG = 14
   REPL_TXN = 15
-  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 
=> "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => 
"STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 
13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN"}
-  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, 
FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, 
REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN]).freeze
+  REPL_INCREMENTAL_LOAD = 16
+  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 
=> "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => 
"STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 
13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG", 15 => "REPL_TXN", 16 => 
"REPL_INCREMENTAL_LOAD"}
+  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, 
FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, 
REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG, REPL_TXN, REPL_INCREMENTAL_LOAD]).freeze
 end
 
 class Adjacency

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 3a107b7..47a802f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadTask;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
new file mode 100644
index 0000000..d6f41fa
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
+import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction;
+import 
org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
+
+public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
+  private final static int ZERO_TASKS = 0;
+
+  @Override
+  public String getName() {
+    return (work.isIncrementalLoad() ? "REPL_INCREMENTAL_LOAD" : 
"REPL_BOOTSTRAP_LOAD");
+  }
+
+  /**
+   * Provides the root Tasks created as a result of this loadTask run which 
will be executed
+   * by the driver. It does not track details across multiple runs of LoadTask.
+   */
+  private static class Scope {
+    boolean database = false, table = false, partition = false;
+    List<Task<? extends Serializable>> rootTasks = new ArrayList<>();
+  }
+
+  @Override
+  protected int execute(DriverContext driverContext) {
+    if (work.isIncrementalLoad()) {
+      return executeIncrementalLoad(driverContext);
+    } else {
+      return executeBootStrapLoad(driverContext);
+    }
+  }
+
+  private int executeBootStrapLoad(DriverContext driverContext) {
+    try {
+      int maxTasks = 
conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
+      Context context = new Context(work.dumpDirectory, conf, getHive(),
+              work.sessionStateLineageState, driverContext.getCtx());
+      TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
+      /*
+          for now for simplicity we are doing just one directory ( one 
database ), come back to use
+          of multiple databases once we have the basic flow to chain creating 
of tasks in place for
+          a database ( directory )
+      */
+      BootstrapEventsIterator iterator = work.iterator();
+      ConstraintEventsIterator constraintIterator = work.constraintIterator();
+      /*
+      This is used to get hold of a reference during the current creation of 
tasks and is initialized
+      with "0" tasks such that it will be non consequential in any operations 
done with task tracker
+      compositions.
+       */
+      TaskTracker dbTracker = new TaskTracker(ZERO_TASKS);
+      TaskTracker tableTracker = new TaskTracker(ZERO_TASKS);
+      Scope scope = new Scope();
+      boolean loadingConstraint = false;
+      if (!iterator.hasNext() && constraintIterator.hasNext()) {
+        loadingConstraint = true;
+      }
+      while ((iterator.hasNext() || (loadingConstraint && 
constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) {
+        BootstrapEvent next;
+        if (!loadingConstraint) {
+          next = iterator.next();
+        } else {
+          next = constraintIterator.next();
+        }
+        switch (next.eventType()) {
+        case Database:
+          DatabaseEvent dbEvent = (DatabaseEvent) next;
+          dbTracker =
+              new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, 
loadTaskTracker)
+                  .tasks();
+          loadTaskTracker.update(dbTracker);
+          if (work.hasDbState()) {
+            loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, 
scope));
+          }
+          work.updateDbEventState(dbEvent.toState());
+          if (dbTracker.hasTasks()) {
+            scope.rootTasks.addAll(dbTracker.tasks());
+            scope.database = true;
+          }
+          dbTracker.debugLog("database");
+          break;
+        case Table: {
+          /*
+              Implicit assumption here is that database level is processed 
first before table level,
+              which will depend on the iterator used since it should provide 
the higher level directory
+              listing before providing the lower level listing. This is also 
required such that
+              the dbTracker /  tableTracker are setup correctly always.
+           */
+          TableContext tableContext =
+              new TableContext(dbTracker, work.dbNameToLoadIn, 
work.tableNameToLoadIn);
+          TableEvent tableEvent = (TableEvent) next;
+          LoadTable loadTable = new LoadTable(tableEvent, context, 
iterator.replLogger(),
+                                              tableContext, loadTaskTracker);
+          tableTracker = loadTable.tasks();
+          setUpDependencies(dbTracker, tableTracker);
+          if (!scope.database && tableTracker.hasTasks()) {
+            scope.rootTasks.addAll(tableTracker.tasks());
+            scope.table = true;
+          }
+          /*
+            for table replication if we reach the max number of tasks then for 
the next run we will
+            try to reload the same table again, this is mainly for ease of 
understanding the code
+            as then we can avoid handling == > loading partitions for the 
table given that
+            the creation of table lead to reaching max tasks vs,  loading next 
table since current
+            one does not have partitions.
+           */
+
+          // for a table we explicitly try to load partitions as there is no 
separate partitions events.
+          LoadPartitions loadPartitions =
+              new LoadPartitions(context, iterator.replLogger(), 
loadTaskTracker, tableEvent,
+                      work.dbNameToLoadIn, tableContext);
+          TaskTracker partitionsTracker = loadPartitions.tasks();
+          partitionsPostProcessing(iterator, scope, loadTaskTracker, 
tableTracker,
+              partitionsTracker);
+          tableTracker.debugLog("table");
+          partitionsTracker.debugLog("partitions for table");
+          break;
+        }
+        case Partition: {
+          /*
+              This will happen only when loading tables and we reach the limit 
of number of tasks we can create;
+              hence we know here that the table should exist and there should 
be a lastPartitionName
+          */
+          PartitionEvent event = (PartitionEvent) next;
+          TableContext tableContext = new TableContext(dbTracker, 
work.dbNameToLoadIn,
+              work.tableNameToLoadIn);
+          LoadPartitions loadPartitions =
+              new LoadPartitions(context, iterator.replLogger(), tableContext, 
loadTaskTracker,
+                      event.asTableEvent(), work.dbNameToLoadIn, 
event.lastPartitionReplicated());
+          /*
+               the tableTracker here should be a new instance and not an 
existing one as this can
+               only happen when we break in between loading partitions.
+           */
+          TaskTracker partitionsTracker = loadPartitions.tasks();
+          partitionsPostProcessing(iterator, scope, loadTaskTracker, 
tableTracker,
+              partitionsTracker);
+          partitionsTracker.debugLog("partitions");
+          break;
+        }
+        case Function: {
+          LoadFunction loadFunction = new LoadFunction(context, 
iterator.replLogger(),
+                                              (FunctionEvent) next, 
work.dbNameToLoadIn, dbTracker);
+          TaskTracker functionsTracker = loadFunction.tasks();
+          if (!scope.database) {
+            scope.rootTasks.addAll(functionsTracker.tasks());
+          } else {
+            setUpDependencies(dbTracker, functionsTracker);
+          }
+          loadTaskTracker.update(functionsTracker);
+          functionsTracker.debugLog("functions");
+          break;
+        }
+        case Constraint: {
+          LoadConstraint loadConstraint =
+              new LoadConstraint(context, (ConstraintEvent) next, 
work.dbNameToLoadIn, dbTracker);
+          TaskTracker constraintTracker = loadConstraint.tasks();
+          scope.rootTasks.addAll(constraintTracker.tasks());
+          loadTaskTracker.update(constraintTracker);
+          constraintTracker.debugLog("constraints");
+        }
+        }
+
+        if (!loadingConstraint && !iterator.currentDbHasNext()) {
+          createEndReplLogTask(context, scope, iterator.replLogger());
+        }
+      }
+      boolean addAnotherLoadTask = iterator.hasNext() || 
loadTaskTracker.hasReplicationState()
+          || constraintIterator.hasNext();
+      if (addAnotherLoadTask) {
+        createBuilderTask(scope.rootTasks);
+      }
+      if (!iterator.hasNext() && !constraintIterator.hasNext()) {
+        loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, 
scope));
+        work.updateDbEventState(null);
+      }
+      this.childTasks = scope.rootTasks;
+      /*
+      Since there can be multiple rounds of this run all of which will be tied 
to the same
+      query id -- generated in compile phase , adding a additional UUID to the 
end to print each run
+      in separate files.
+       */
+      LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), 
loadTaskTracker.numberOfTasks());
+
+      // Populate the driver context with the scratch dir info from the repl 
context, so that the temp dirs will be cleaned up later
+      
driverContext.getCtx().getFsScratchDirs().putAll(context.pathInfo.getFsScratchDirs());
+    }  catch (RuntimeException e) {
+      LOG.error("replication failed with run time exception", e);
+      throw e;
+    } catch (Exception e) {
+      LOG.error("replication failed", e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+    LOG.info("completed load task run : {}", work.executedLoadTask());
+    return 0;
+  }
+
+  private void createEndReplLogTask(Context context, Scope scope,
+                                                  ReplLogger replLogger) 
throws SemanticException {
+    Database dbInMetadata = 
work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn);
+    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, 
dbInMetadata.getParameters());
+    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork);
+    if (scope.rootTasks.isEmpty()) {
+      scope.rootTasks.add(replLogTask);
+    } else {
+      DAGTraversal.traverse(scope.rootTasks,
+          new AddDependencyToLeaves(Collections.singletonList(replLogTask)));
+    }
+  }
+
+  /**
+   * There was a database update done before and we want to make sure we 
update the last repl
+   * id on this database as we are now going to switch to processing a new 
database.
+   *
+   * This has to be last task in the graph since if there are intermediate 
tasks and the last.repl.id
+   * is a root level task then in the execution phase the root level tasks 
will get executed first,
+   * however if any of the child tasks of the bootstrap load failed then even 
though the bootstrap has failed
+   * the last repl status of the target database will return a valid value, 
which will not represent
+   * the state of the database.
+   */
+  private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, 
Scope scope)
+      throws SemanticException {
+  /*
+    we don't want to put any limits on this task as this is essential before 
we start
+    processing new database events.
+   */
+    TaskTracker taskTracker =
+        new AlterDatabase(context, work.databaseEvent(context.hiveConf), 
work.dbNameToLoadIn,
+            new TaskTracker(maxTasks)).tasks();
+
+    AddDependencyToLeaves function = new 
AddDependencyToLeaves(taskTracker.tasks());
+    DAGTraversal.traverse(scope.rootTasks, function);
+
+    return taskTracker;
+  }
+
+  private void partitionsPostProcessing(BootstrapEventsIterator iterator,
+      Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker,
+      TaskTracker partitionsTracker) throws SemanticException {
+    setUpDependencies(tableTracker, partitionsTracker);
+    if (!scope.database && !scope.table) {
+      scope.rootTasks.addAll(partitionsTracker.tasks());
+      scope.partition = true;
+    }
+    loadTaskTracker.update(tableTracker);
+    loadTaskTracker.update(partitionsTracker);
+    if (partitionsTracker.hasReplicationState()) {
+      iterator.setReplicationState(partitionsTracker.replicationState());
+    }
+  }
+
+  /*
+      This sets up dependencies such that a child task is dependant on the 
parent to be complete.
+   */
+  private void setUpDependencies(TaskTracker parentTasks, TaskTracker 
childTasks) {
+    if (parentTasks.hasTasks()) {
+      for (Task<? extends Serializable> parentTask : parentTasks.tasks()) {
+        for (Task<? extends Serializable> childTask : childTasks.tasks()) {
+          parentTask.addDependentTask(childTask);
+        }
+      }
+    } else {
+      for (Task<? extends Serializable> childTask : childTasks.tasks()) {
+        parentTasks.addTask(childTask);
+      }
+    }
+  }
+
+  private void createBuilderTask(List<Task<? extends Serializable>> rootTasks) 
{
+    // Use loadTask as dependencyCollection
+    Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf);
+    DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask));
+  }
+
+  @Override
+  public StageType getType() {
+    return work.isIncrementalLoad() ? StageType.REPL_INCREMENTAL_LOAD : 
StageType.REPL_BOOTSTRAP_LOAD;
+  }
+
+  private int executeIncrementalLoad(DriverContext driverContext) {
+    try {
+      IncrementalLoadTasksBuilder load = work.getIncrementalLoadTaskBuilder();
+      this.childTasks = Collections.singletonList(load.build(driverContext, 
getHive(), LOG));
+      if (work.getIncrementalIterator().hasNext()) {
+        // attach a load task at the tail of task list to start the next 
iteration.
+        createBuilderTask(this.childTasks);
+      }
+      return 0;
+    } catch (Exception e) {
+      LOG.error("failed replication", e);
+      setException(e);
+      return 1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
new file mode 100644
index 0000000..8921e94
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
+import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
+import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
+import 
org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator;
+import 
org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.session.LineageState;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+@Explain(displayName = "Replication Load Operator", explainLevels = { 
Explain.Level.USER,
+    Explain.Level.DEFAULT,
+    Explain.Level.EXTENDED })
+public class ReplLoadWork implements Serializable {
+  final String dbNameToLoadIn;
+  final String tableNameToLoadIn;
+  final String dumpDirectory;
+  private final transient BootstrapEventsIterator bootstrapIterator;
+  private final ConstraintEventsIterator constraintsIterator;
+  private final transient IncrementalLoadEventsIterator incrementalIterator;
+  private int loadTaskRunCount = 0;
+  private DatabaseEvent.State state = null;
+  private final transient IncrementalLoadTasksBuilder incrementalLoad;
+
+  /*
+  these are sessionState objects that are copied over to work to allow for 
parallel execution.
+  based on the current use case the methods are selectively synchronized, 
which might need to be
+  taken care when using other methods.
+  */
+  final LineageState sessionStateLineageState;
+
+  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String 
dbNameToLoadIn,
+      String tableNameToLoadIn, LineageState lineageState, boolean 
isIncrementalDump) throws IOException {
+    this.tableNameToLoadIn = tableNameToLoadIn;
+    sessionStateLineageState = lineageState;
+    this.dumpDirectory = dumpDirectory;
+    this.dbNameToLoadIn = dbNameToLoadIn;
+    if (isIncrementalDump) {
+      incrementalIterator = new IncrementalLoadEventsIterator(dumpDirectory, 
hiveConf);
+      this.bootstrapIterator = null;
+      this.constraintsIterator = null;
+      incrementalLoad = new IncrementalLoadTasksBuilder(dbNameToLoadIn, 
tableNameToLoadIn, dumpDirectory,
+              incrementalIterator, hiveConf);
+    } else {
+      this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, 
dbNameToLoadIn, hiveConf);
+      this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, 
hiveConf);
+      incrementalIterator = null;
+      incrementalLoad = null;
+    }
+  }
+
+  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String 
dbNameOrPattern,
+      LineageState lineageState) throws IOException {
+    this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, false);
+  }
+
+  public BootstrapEventsIterator iterator() {
+    return bootstrapIterator;
+  }
+
+  public ConstraintEventsIterator constraintIterator() {
+    return constraintsIterator;
+  }
+
+  int executedLoadTask() {
+    return ++loadTaskRunCount;
+  }
+
+  void updateDbEventState(DatabaseEvent.State state) {
+    this.state = state;
+  }
+
+  DatabaseEvent databaseEvent(HiveConf hiveConf) {
+    return state.toEvent(hiveConf);
+  }
+
+  boolean hasDbState() {
+    return state != null;
+  }
+
+  public boolean isIncrementalLoad() {
+    return incrementalIterator != null;
+  }
+
+  public IncrementalLoadEventsIterator getIncrementalIterator() {
+    return incrementalIterator;
+  }
+
+  public IncrementalLoadTasksBuilder getIncrementalLoadTaskBuilder() {
+    return incrementalLoad;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
deleted file mode 100644
index e41e4b5..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplUtils.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.repl;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
-import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
-import org.apache.hadoop.hive.ql.plan.DDLWork;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
-import org.apache.hadoop.hive.ql.stats.StatsUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-
-public class ReplUtils {
-
-  public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key";
-
-  /**
-   * Bootstrap REPL LOAD operation type on the examined object based on ckpt 
state.
-   */
-  public enum ReplLoadOpType {
-    LOAD_NEW, LOAD_SKIP, LOAD_REPLACE
-  }
-
-  public static Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(
-          Table table, List<Map<String, String>> partitions) throws 
SemanticException {
-    Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>();
-    int partPrefixLength = 0;
-    if (partitions.size() > 0) {
-      partPrefixLength = partitions.get(0).size();
-      // pick the length of the first ptn, we expect all ptns listed to have 
the same number of
-      // key-vals.
-    }
-    List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>();
-    for (Map<String, String> ptn : partitions) {
-      // convert each key-value-map to appropriate expression.
-      ExprNodeGenericFuncDesc expr = null;
-      for (Map.Entry<String, String> kvp : ptn.entrySet()) {
-        String key = kvp.getKey();
-        Object val = kvp.getValue();
-        String type = table.getPartColByName(key).getType();
-        PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
-        ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, 
true);
-        ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate(
-                "=", column, new 
ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val));
-        expr = (expr == null) ? op : 
DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
-      }
-      if (expr != null) {
-        partitionDesc.add(expr);
-      }
-    }
-    if (partitionDesc.size() > 0) {
-      partSpecs.put(partPrefixLength, partitionDesc);
-    }
-    return partSpecs;
-  }
-
-  public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, 
ReplLogger replLogger, HiveConf conf)
-          throws SemanticException {
-    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, 
tableDesc.getTableName(), tableDesc.tableType());
-    return TaskFactory.get(replLogWork, conf);
-  }
-
-  public static Task<?> getTableCheckpointTask(ImportTableDesc tableDesc, 
HashMap<String, String> partSpec,
-                                               String dumpRoot, HiveConf conf) 
throws SemanticException {
-    HashMap<String, String> mapProp = new HashMap<>();
-    mapProp.put(REPL_CHECKPOINT_KEY, dumpRoot);
-
-    AlterTableDesc alterTblDesc =  new 
AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS);
-    alterTblDesc.setProps(mapProp);
-    alterTblDesc.setOldName(
-            StatsUtils.getFullyQualifiedTableName(tableDesc.getDatabaseName(), 
tableDesc.getTableName()));
-    if (partSpec != null) {
-      alterTblDesc.setPartSpec(partSpec);
-    }
-    return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), 
alterTblDesc), conf);
-  }
-
-  public static boolean replCkptStatus(String dbName, Map<String, String> 
props, String dumpRoot)
-          throws InvalidOperationException {
-    // If ckpt property not set or empty means, bootstrap is not run on this 
object.
-    if ((props != null) && props.containsKey(REPL_CHECKPOINT_KEY) && 
!props.get(REPL_CHECKPOINT_KEY).isEmpty()) {
-      if (props.get(REPL_CHECKPOINT_KEY).equals(dumpRoot)) {
-        return true;
-      }
-      throw new 
InvalidOperationException(ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.format(dumpRoot,
-              props.get(REPL_CHECKPOINT_KEY)));
-    }
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java
deleted file mode 100644
index 0313058..0000000
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeaves.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
-
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-
-public class AddDependencyToLeaves implements DAGTraversal.Function {
-  private List<Task<? extends Serializable>> postDependencyCollectionTasks;
-
-  AddDependencyToLeaves(List<Task<? extends Serializable>> 
postDependencyCollectionTasks) {
-    this.postDependencyCollectionTasks = postDependencyCollectionTasks;
-  }
-
-  public AddDependencyToLeaves(Task<? extends Serializable> 
postDependencyTask) {
-    this(Collections.singletonList(postDependencyTask));
-  }
-
-
-  @Override
-  public void process(Task<? extends Serializable> task) {
-    if (task.getChildTasks() == null) {
-      postDependencyCollectionTasks.forEach(task::addDependentTask);
-    }
-  }
-
-  @Override
-  public boolean skipProcessing(Task<? extends Serializable> task) {
-    return postDependencyCollectionTasks.contains(task);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
deleted file mode 100644
index b33a774..0000000
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.PartitionEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
-import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
-import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadConstraint;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadFunction;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadPartitions;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
-import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
-import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
-
-public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
-  private final static int ZERO_TASKS = 0;
-
-  @Override
-  public String getName() {
-    return "REPL_BOOTSTRAP_LOAD";
-  }
-
-  /**
-   * Provides the root Tasks created as a result of this loadTask run which 
will be executed
-   * by the driver. It does not track details across multiple runs of LoadTask.
-   */
-  private static class Scope {
-    boolean database = false, table = false, partition = false;
-    List<Task<? extends Serializable>> rootTasks = new ArrayList<>();
-  }
-
-  @Override
-  protected int execute(DriverContext driverContext) {
-    try {
-      int maxTasks = 
conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
-      Context context = new Context(work.dumpDirectory, conf, getHive(),
-              work.sessionStateLineageState, driverContext.getCtx());
-      TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
-      /*
-          for now for simplicity we are doing just one directory ( one 
database ), come back to use
-          of multiple databases once we have the basic flow to chain creating 
of tasks in place for
-          a database ( directory )
-      */
-      BootstrapEventsIterator iterator = work.iterator();
-      ConstraintEventsIterator constraintIterator = work.constraintIterator();
-      /*
-      This is used to get hold of a reference during the current creation of 
tasks and is initialized
-      with "0" tasks such that it will be non consequential in any operations 
done with task tracker
-      compositions.
-       */
-      TaskTracker dbTracker = new TaskTracker(ZERO_TASKS);
-      TaskTracker tableTracker = new TaskTracker(ZERO_TASKS);
-      Scope scope = new Scope();
-      boolean loadingConstraint = false;
-      if (!iterator.hasNext() && constraintIterator.hasNext()) {
-        loadingConstraint = true;
-      }
-      while ((iterator.hasNext() || (loadingConstraint && 
constraintIterator.hasNext())) && loadTaskTracker.canAddMoreTasks()) {
-        BootstrapEvent next;
-        if (!loadingConstraint) {
-          next = iterator.next();
-        } else {
-          next = constraintIterator.next();
-        }
-        switch (next.eventType()) {
-        case Database:
-          DatabaseEvent dbEvent = (DatabaseEvent) next;
-          dbTracker =
-              new LoadDatabase(context, dbEvent, work.dbNameToLoadIn, 
loadTaskTracker)
-                  .tasks();
-          loadTaskTracker.update(dbTracker);
-          if (work.hasDbState()) {
-            loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, 
scope));
-          }
-          work.updateDbEventState(dbEvent.toState());
-          if (dbTracker.hasTasks()) {
-            scope.rootTasks.addAll(dbTracker.tasks());
-            scope.database = true;
-          }
-          dbTracker.debugLog("database");
-          break;
-        case Table: {
-          /*
-              Implicit assumption here is that database level is processed 
first before table level,
-              which will depend on the iterator used since it should provide 
the higher level directory
-              listing before providing the lower level listing. This is also 
required such that
-              the dbTracker /  tableTracker are setup correctly always.
-           */
-          TableContext tableContext =
-              new TableContext(dbTracker, work.dbNameToLoadIn, 
work.tableNameToLoadIn);
-          TableEvent tableEvent = (TableEvent) next;
-          LoadTable loadTable = new LoadTable(tableEvent, context, 
iterator.replLogger(),
-                                              tableContext, loadTaskTracker);
-          tableTracker = loadTable.tasks();
-          setUpDependencies(dbTracker, tableTracker);
-          if (!scope.database && tableTracker.hasTasks()) {
-            scope.rootTasks.addAll(tableTracker.tasks());
-            scope.table = true;
-          }
-          /*
-            for table replication if we reach the max number of tasks then for 
the next run we will
-            try to reload the same table again, this is mainly for ease of 
understanding the code
-            as then we can avoid handling == > loading partitions for the 
table given that
-            the creation of table lead to reaching max tasks vs,  loading next 
table since current
-            one does not have partitions.
-           */
-
-          // for a table we explicitly try to load partitions as there is no 
separate partitions events.
-          LoadPartitions loadPartitions =
-              new LoadPartitions(context, iterator.replLogger(), 
loadTaskTracker, tableEvent,
-                      work.dbNameToLoadIn, tableContext);
-          TaskTracker partitionsTracker = loadPartitions.tasks();
-          partitionsPostProcessing(iterator, scope, loadTaskTracker, 
tableTracker,
-              partitionsTracker);
-          tableTracker.debugLog("table");
-          partitionsTracker.debugLog("partitions for table");
-          break;
-        }
-        case Partition: {
-          /*
-              This will happen only when loading tables and we reach the limit 
of number of tasks we can create;
-              hence we know here that the table should exist and there should 
be a lastPartitionName
-          */
-          PartitionEvent event = (PartitionEvent) next;
-          TableContext tableContext = new TableContext(dbTracker, 
work.dbNameToLoadIn,
-              work.tableNameToLoadIn);
-          LoadPartitions loadPartitions =
-              new LoadPartitions(context, iterator.replLogger(), tableContext, 
loadTaskTracker,
-                      event.asTableEvent(), work.dbNameToLoadIn, 
event.lastPartitionReplicated());
-          /*
-               the tableTracker here should be a new instance and not an 
existing one as this can
-               only happen when we break in between loading partitions.
-           */
-          TaskTracker partitionsTracker = loadPartitions.tasks();
-          partitionsPostProcessing(iterator, scope, loadTaskTracker, 
tableTracker,
-              partitionsTracker);
-          partitionsTracker.debugLog("partitions");
-          break;
-        }
-        case Function: {
-          LoadFunction loadFunction = new LoadFunction(context, 
iterator.replLogger(),
-                                              (FunctionEvent) next, 
work.dbNameToLoadIn, dbTracker);
-          TaskTracker functionsTracker = loadFunction.tasks();
-          if (!scope.database) {
-            scope.rootTasks.addAll(functionsTracker.tasks());
-          } else {
-            setUpDependencies(dbTracker, functionsTracker);
-          }
-          loadTaskTracker.update(functionsTracker);
-          functionsTracker.debugLog("functions");
-          break;
-        }
-        case Constraint: {
-          LoadConstraint loadConstraint =
-              new LoadConstraint(context, (ConstraintEvent) next, 
work.dbNameToLoadIn, dbTracker);
-          TaskTracker constraintTracker = loadConstraint.tasks();
-          scope.rootTasks.addAll(constraintTracker.tasks());
-          loadTaskTracker.update(constraintTracker);
-          constraintTracker.debugLog("constraints");
-        }
-        }
-
-        if (!loadingConstraint && !iterator.currentDbHasNext()) {
-          createEndReplLogTask(context, scope, iterator.replLogger());
-        }
-      }
-      boolean addAnotherLoadTask = iterator.hasNext() || 
loadTaskTracker.hasReplicationState()
-          || constraintIterator.hasNext();
-      createBuilderTask(scope.rootTasks, addAnotherLoadTask);
-      if (!iterator.hasNext() && !constraintIterator.hasNext()) {
-        loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, 
scope));
-        work.updateDbEventState(null);
-      }
-      this.childTasks = scope.rootTasks;
-      /*
-      Since there can be multiple rounds of this run all of which will be tied 
to the same
-      query id -- generated in compile phase , adding a additional UUID to the 
end to print each run
-      in separate files.
-       */
-      LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), 
loadTaskTracker.numberOfTasks());
-
-      // Populate the driver context with the scratch dir info from the repl 
context, so that the temp dirs will be cleaned up later
-      
driverContext.getCtx().getFsScratchDirs().putAll(context.pathInfo.getFsScratchDirs());
-    } catch (Exception e) {
-      LOG.error("failed replication", e);
-      setException(e);
-      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
-    }
-    LOG.info("completed load task run : {}", work.executedLoadTask());
-    return 0;
-  }
-
-  private void createEndReplLogTask(Context context, Scope scope,
-                                                  ReplLogger replLogger) 
throws SemanticException {
-    Database dbInMetadata = 
work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn);
-    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, 
dbInMetadata.getParameters());
-    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork);
-    if (scope.rootTasks.isEmpty()) {
-      scope.rootTasks.add(replLogTask);
-    } else {
-      DAGTraversal.traverse(scope.rootTasks,
-          new AddDependencyToLeaves(Collections.singletonList(replLogTask)));
-    }
-  }
-
-  /**
-   * There was a database update done before and we want to make sure we 
update the last repl
-   * id on this database as we are now going to switch to processing a new 
database.
-   *
-   * This has to be last task in the graph since if there are intermediate 
tasks and the last.repl.id
-   * is a root level task then in the execution phase the root level tasks 
will get executed first,
-   * however if any of the child tasks of the bootstrap load failed then even 
though the bootstrap has failed
-   * the last repl status of the target database will return a valid value, 
which will not represent
-   * the state of the database.
-   */
-  private TaskTracker updateDatabaseLastReplID(int maxTasks, Context context, 
Scope scope)
-      throws SemanticException {
-  /*
-    we don't want to put any limits on this task as this is essential before 
we start
-    processing new database events.
-   */
-    TaskTracker taskTracker =
-        new AlterDatabase(context, work.databaseEvent(context.hiveConf), 
work.dbNameToLoadIn,
-            new TaskTracker(maxTasks)).tasks();
-
-    AddDependencyToLeaves function = new 
AddDependencyToLeaves(taskTracker.tasks());
-    DAGTraversal.traverse(scope.rootTasks, function);
-
-    return taskTracker;
-  }
-
-  private void partitionsPostProcessing(BootstrapEventsIterator iterator,
-      Scope scope, TaskTracker loadTaskTracker, TaskTracker tableTracker,
-      TaskTracker partitionsTracker) throws SemanticException {
-    setUpDependencies(tableTracker, partitionsTracker);
-    if (!scope.database && !scope.table) {
-      scope.rootTasks.addAll(partitionsTracker.tasks());
-      scope.partition = true;
-    }
-    loadTaskTracker.update(tableTracker);
-    loadTaskTracker.update(partitionsTracker);
-    if (partitionsTracker.hasReplicationState()) {
-      iterator.setReplicationState(partitionsTracker.replicationState());
-    }
-  }
-
-  /*
-      This sets up dependencies such that a child task is dependant on the 
parent to be complete.
-   */
-  private void setUpDependencies(TaskTracker parentTasks, TaskTracker 
childTasks) {
-    if (parentTasks.hasTasks()) {
-      for (Task<? extends Serializable> parentTask : parentTasks.tasks()) {
-        for (Task<? extends Serializable> childTask : childTasks.tasks()) {
-          parentTask.addDependentTask(childTask);
-        }
-      }
-    } else {
-      for (Task<? extends Serializable> childTask : childTasks.tasks()) {
-        parentTasks.addTask(childTask);
-      }
-    }
-  }
-
-  private void createBuilderTask(List<Task<? extends Serializable>> rootTasks,
-      boolean shouldCreateAnotherLoadTask) {
-  /*
-    use loadTask as dependencyCollection
-   */
-    if (shouldCreateAnotherLoadTask) {
-      Task<ReplLoadWork> loadTask = TaskFactory.get(work, conf);
-      DAGTraversal.traverse(rootTasks, new AddDependencyToLeaves(loadTask));
-    }
-  }
-
-  @Override
-  public StageType getType() {
-    return StageType.REPL_BOOTSTRAP_LOAD;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
deleted file mode 100644
index 048727f..0000000
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
-import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
-import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
-import org.apache.hadoop.hive.ql.plan.Explain;
-import org.apache.hadoop.hive.ql.session.LineageState;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-@Explain(displayName = "Replication Load Operator", explainLevels = { 
Explain.Level.USER,
-    Explain.Level.DEFAULT,
-    Explain.Level.EXTENDED })
-public class ReplLoadWork implements Serializable {
-  final String dbNameToLoadIn;
-  final String tableNameToLoadIn;
-  final String dumpDirectory;
-  private final BootstrapEventsIterator iterator;
-  private final ConstraintEventsIterator constraintsIterator;
-  private int loadTaskRunCount = 0;
-  private DatabaseEvent.State state = null;
-
-  /*
-  these are sessionState objects that are copied over to work to allow for 
parallel execution.
-  based on the current use case the methods are selectively synchronized, 
which might need to be
-  taken care when using other methods.
-  */
-  final LineageState sessionStateLineageState;
-
-  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String 
dbNameToLoadIn,
-      String tableNameToLoadIn, LineageState lineageState)
-      throws IOException {
-    this.tableNameToLoadIn = tableNameToLoadIn;
-    sessionStateLineageState = lineageState;
-    this.dumpDirectory = dumpDirectory;
-    this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, 
hiveConf);
-    this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, 
hiveConf);
-    this.dbNameToLoadIn = dbNameToLoadIn;
-  }
-
-  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String 
dbNameOrPattern,
-      LineageState lineageState) throws IOException {
-    this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState);
-  }
-
-  public BootstrapEventsIterator iterator() {
-    return iterator;
-  }
-
-  public ConstraintEventsIterator constraintIterator() {
-    return constraintsIterator;
-  }
-
-  int executedLoadTask() {
-    return ++loadTaskRunCount;
-  }
-
-  void updateDbEventState(DatabaseEvent.State state) {
-    this.state = state;
-  }
-
-  DatabaseEvent databaseEvent(HiveConf hiveConf) {
-    return state.toEvent(hiveConf);
-  }
-
-  boolean hasDbState() {
-    return state != null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
index 89d2ac2..ebe0090 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
@@ -82,6 +82,15 @@ public class BootstrapEventsIterator implements 
Iterator<BootstrapEvent> {
     FileSystem fileSystem = path.getFileSystem(hiveConf);
     FileStatus[] fileStatuses =
         fileSystem.listStatus(new Path(dumpDirectory), 
EximUtil.getDirectoryFilter(fileSystem));
+    if ((fileStatuses == null) || (fileStatuses.length == 0)) {
+      throw new IllegalArgumentException("No data to load in path " + 
dumpDirectory);
+    }
+    if ((dbNameToLoadIn != null) && (fileStatuses.length > 1)) {
+      throw new IllegalArgumentException(
+              "Multiple dirs in "
+                      + dumpDirectory
+                      + " does not correspond to REPL LOAD expecting to load 
to a singular destination point.");
+    }
 
     List<FileStatus> dbsToCreate = Arrays.stream(fileStatuses).filter(f -> {
       Path metadataPath = new Path(f.getPath() + Path.SEPARATOR + 
EximUtil.METADATA_NAME);

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
index 26f4892..d09b98c 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadConstraint.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.ConstraintEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
index 7ddae6f..0fd305a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadDatabase.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -31,8 +32,8 @@ import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.PrincipalDesc;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
 
 import java.io.Serializable;
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
index b886ff4..a7c8ca4 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
@@ -26,9 +26,10 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.EximUtil;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
deleted file mode 100644
index f8f0801..0000000
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
-
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
-import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * This class will be responsible to track how many tasks have been created,
- * organization of tasks such that after the number of tasks for next 
execution are created
- * we create a dependency collection task(DCT) -> another bootstrap task,
- * and then add DCT as dependent to all existing tasks that are created so the 
cycle can continue.
- */
-public class TaskTracker {
-  private static Logger LOG = LoggerFactory.getLogger(TaskTracker.class);
-  /**
-   * used to identify the list of tasks at root level for a given level like 
table /  db / partition.
-   * this does not include the task dependency notion of "table tasks < ---- 
partition task"
-   */
-  private final List<Task<? extends Serializable>> tasks = new ArrayList<>();
-  private ReplicationState replicationState = null;
-  // since tasks themselves can be graphs we want to limit the number of 
created
-  // tasks including all of dependencies.
-  private int numberOfTasks = 0;
-  private final int maxTasksAllowed;
-
-  public TaskTracker(int defaultMaxTasks) {
-    maxTasksAllowed = defaultMaxTasks;
-  }
-
-  public TaskTracker(TaskTracker existing) {
-    maxTasksAllowed = existing.maxTasksAllowed - existing.numberOfTasks;
-  }
-
-  /**
-   * this method is used to identify all the tasks in a graph.
-   * the graph however might get created in a disjoint fashion, in which case 
we can just update
-   * the number of tasks using the "update" method.
-   */
-  public void addTask(Task<? extends Serializable> task) {
-    tasks.add(task);
-
-    List <Task<? extends Serializable>> visited = new ArrayList<>();
-    updateTaskCount(task, visited);
-  }
-
-  // This method is used to traverse the DAG created in tasks list and add the 
dependent task to
-  // the tail of each task chain.
-  public void addDependentTask(Task<? extends Serializable> dependent) {
-    if (tasks.isEmpty()) {
-      addTask(dependent);
-    } else {
-      DAGTraversal.traverse(tasks, new AddDependencyToLeaves(dependent));
-
-      List<Task<? extends Serializable>> visited = new ArrayList<>();
-      updateTaskCount(dependent, visited);
-    }
-  }
-
-  private void updateTaskCount(Task<? extends Serializable> task,
-                               List <Task<? extends Serializable>> visited) {
-    numberOfTasks += 1;
-    visited.add(task);
-    if (task.getChildTasks() != null) {
-      for (Task<? extends Serializable> childTask : task.getChildTasks()) {
-        if (visited.contains(childTask)) {
-          continue;
-        }
-        updateTaskCount(childTask, visited);
-      }
-    }
-  }
-
-  public boolean canAddMoreTasks() {
-    return numberOfTasks < maxTasksAllowed;
-  }
-
-  public boolean hasTasks() {
-    return numberOfTasks != 0;
-  }
-
-  public void update(TaskTracker withAnother) {
-    numberOfTasks += withAnother.numberOfTasks;
-    if (withAnother.hasReplicationState()) {
-      this.replicationState = withAnother.replicationState;
-    }
-  }
-
-  public void setReplicationState(ReplicationState state) {
-    this.replicationState = state;
-  }
-
-  public boolean hasReplicationState() {
-    return replicationState != null;
-  }
-
-  public ReplicationState replicationState() {
-    return replicationState;
-  }
-
-  public List<Task<? extends Serializable>> tasks() {
-    return tasks;
-  }
-
-  public void debugLog(String forEventType) {
-    LOG.debug("{} event with total / root number of tasks:{}/{}", 
forEventType, numberOfTasks,
-        tasks.size());
-  }
-
-  public int numberOfTasks() {
-    return numberOfTasks;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index f6493f7..c0cfc43 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -27,11 +27,11 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 419a511..089b529 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -27,10 +27,10 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils.ReplLoadOpType;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
index b5b5b90..8e01fb1 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/TableContext.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 

Reply via email to