Repository: hive
Updated Branches:
  refs/heads/branch-3 c191ea5e9 -> 150ef3ba5


http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
new file mode 100644
index 0000000..4b37c8d
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadEventsIterator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.incremental;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * IncrementalLoadEventsIterator
+ * Helper class to iterate through event dump directory.
+ */
+public class IncrementalLoadEventsIterator implements Iterator<FileStatus> {
+  private FileStatus[] eventDirs;
+  private int currentIndex;
+  private int numEvents;
+
+  public IncrementalLoadEventsIterator(String loadPath, HiveConf conf) throws 
IOException {
+    Path eventPath = new Path(loadPath);
+    FileSystem fs = eventPath.getFileSystem(conf);
+    eventDirs = fs.listStatus(eventPath, EximUtil.getDirectoryFilter(fs));
+    if ((eventDirs == null) || (eventDirs.length == 0)) {
+      throw new IllegalArgumentException("No data to load in path " + 
loadPath);
+    }
+    // For event dump, each sub-dir is an individual event dump.
+    // We need to guarantee that the directory listing we got is in order of 
event id.
+    Arrays.sort(eventDirs, new EventDumpDirComparator());
+    currentIndex = 0;
+    numEvents = eventDirs.length;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return (eventDirs != null && currentIndex < numEvents);
+  }
+
+  @Override
+  public FileStatus next() {
+    if (hasNext()) {
+      return eventDirs[currentIndex++];
+    } else {
+      throw new NoSuchElementException("no more events");
+    }
+  }
+
+  public int getNumEvents() {
+    return numEvents;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
new file mode 100644
index 0000000..94e6425
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.incremental;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
+import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
+import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.slf4j.Logger;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.HashSet;
+
+/**
+ * IncrementalLoad
+ * Iterate through the dump directory and create tasks to load the events.
+ */
+public class IncrementalLoadTasksBuilder {
+  private final String dbName, tableName;
+  private final IncrementalLoadEventsIterator iterator;
+  private HashSet<ReadEntity> inputs;
+  private HashSet<WriteEntity> outputs;
+  private Logger log;
+  private final HiveConf conf;
+  private final ReplLogger replLogger;
+  private static long numIteration;
+
+  public IncrementalLoadTasksBuilder(String dbName, String tableName, String 
loadPath,
+                                     IncrementalLoadEventsIterator iterator, 
HiveConf conf) {
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.iterator = iterator;
+    inputs = new HashSet<>();
+    outputs = new HashSet<>();
+    log = null;
+    this.conf = conf;
+    replLogger = new IncrementalLoadLogger(dbName, loadPath, 
iterator.getNumEvents());
+    numIteration = 0;
+    replLogger.startLog();
+  }
+
+  public Task<? extends Serializable> build(DriverContext driverContext, Hive 
hive, Logger log) throws Exception {
+    Task<? extends Serializable> evTaskRoot = TaskFactory.get(new 
DependencyCollectionWork());
+    Task<? extends Serializable> taskChainTail = evTaskRoot;
+    Long lastReplayedEvent = null;
+    this.log = log;
+    numIteration++;
+    this.log.debug("Iteration num " + numIteration);
+    TaskTracker tracker = new 
TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS));
+
+    while (iterator.hasNext() && tracker.canAddMoreTasks()) {
+      FileStatus dir = iterator.next();
+      String location = dir.getPath().toUri().toString();
+      DumpMetaData eventDmd = new DumpMetaData(new Path(location), conf);
+
+      if (!shouldReplayEvent(dir, eventDmd.getDumpType(), dbName, tableName)) {
+        this.log.debug("Skipping event {} from {} for table {}.{} maxTasks: 
{}",
+                eventDmd.getDumpType(), dir.getPath().toUri(), dbName, 
tableName, tracker.numberOfTasks());
+        continue;
+      }
+
+      this.log.debug("Loading event {} from {} for table {}.{} maxTasks: {}",
+              eventDmd.getDumpType(), dir.getPath().toUri(), dbName, 
tableName, tracker.numberOfTasks());
+
+      // event loads will behave similar to table loads, with one crucial 
difference
+      // precursor order is strict, and each event must be processed after the 
previous one.
+      // The way we handle this strict order is as follows:
+      // First, we start with a taskChainTail which is a dummy noop task (a 
DependecyCollectionTask)
+      // at the head of our event chain. For each event we process, we tell 
analyzeTableLoad to
+      // create tasks that use the taskChainTail as a dependency. Then, we 
collect all those tasks
+      // and introduce a new barrier task(also a DependencyCollectionTask) 
which depends on all
+      // these tasks. Then, this barrier task becomes our new taskChainTail. 
Thus, we get a set of
+      // tasks as follows:
+      //
+      //                 --->ev1.task1--                          
--->ev2.task1--
+      //                /               \                        /             
  \
+      //  evTaskRoot-->*---->ev1.task2---*--> 
ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail
+      //                \               /
+      //                 --->ev1.task3--
+      //
+      // Once this entire chain is generated, we add evTaskRoot to rootTasks, 
so as to execute the
+      // entire chain
+
+      MessageHandler.Context context = new MessageHandler.Context(dbName, 
tableName, location,
+              taskChainTail, eventDmd, conf, hive, driverContext.getCtx(), 
this.log);
+      List<Task<? extends Serializable>> evTasks = analyzeEventLoad(context);
+
+      if ((evTasks != null) && (!evTasks.isEmpty())) {
+        ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger,
+                dir.getPath().getName(),
+                eventDmd.getDumpType().toString());
+        Task<? extends Serializable> barrierTask = 
TaskFactory.get(replStateLogWork);
+        AddDependencyToLeaves function = new 
AddDependencyToLeaves(barrierTask);
+        DAGTraversal.traverse(evTasks, function);
+        this.log.debug("Updated taskChainTail from {}:{} to {}:{}",
+                taskChainTail.getClass(), taskChainTail.getId(), 
barrierTask.getClass(), barrierTask.getId());
+        tracker.addTaskList(taskChainTail.getChildTasks());
+        taskChainTail = barrierTask;
+      }
+      lastReplayedEvent = eventDmd.getEventTo();
+    }
+
+    if (!evTaskRoot.equals(taskChainTail) && !iterator.hasNext()) {
+      Map<String, String> dbProps = new HashMap<>();
+      dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), 
String.valueOf(lastReplayedEvent));
+      ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, 
dbProps);
+      Task<? extends Serializable> barrierTask = 
TaskFactory.get(replStateLogWork, conf);
+      taskChainTail.addDependentTask(barrierTask);
+      this.log.debug("Added {}:{} as a precursor of barrier task {}:{}",
+              taskChainTail.getClass(), taskChainTail.getId(),
+              barrierTask.getClass(), barrierTask.getId());
+    }
+    log.info("Iteration " + numIteration + " done with num task : " +
+            tracker.numberOfTasks() + ", lastReplayedEvent : " + 
lastReplayedEvent);
+    return evTaskRoot;
+  }
+
+  private boolean isEventNotReplayed(Map<String, String> params, FileStatus 
dir, DumpType dumpType) {
+    if (params != null && 
(params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
+      String replLastId = 
params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+      if (Long.parseLong(replLastId) >= 
Long.parseLong(dir.getPath().getName())) {
+        log.debug("Event " + dumpType + " with replId " + 
Long.parseLong(dir.getPath().getName())
+                + " is already replayed. LastReplId - " +  
Long.parseLong(replLastId));
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String 
dbName, String tableName) {
+    // if database itself is null then we can not filter out anything.
+    if (dbName == null || dbName.isEmpty()) {
+      return true;
+    } else if ((tableName == null) || (tableName.isEmpty())) {
+      Database database;
+      try {
+        database = Hive.get().getDatabase(dbName);
+        return isEventNotReplayed(database.getParameters(), dir, dumpType);
+      } catch (HiveException e) {
+        //may be the db is getting created in this load
+        log.debug("failed to get the database " + dbName);
+        return true;
+      }
+    } else {
+      Table tbl;
+      try {
+        tbl = Hive.get().getTable(dbName, tableName);
+        return isEventNotReplayed(tbl.getParameters(), dir, dumpType);
+      } catch (HiveException e) {
+        // may be the table is getting created in this load
+        log.debug("failed to get the table " + dbName + "." + tableName);
+        return true;
+      }
+    }
+  }
+
+  private List<Task<? extends Serializable>> 
analyzeEventLoad(MessageHandler.Context context) throws SemanticException {
+    MessageHandler messageHandler = context.dmd.getDumpType().handler();
+    List<Task<? extends Serializable>> tasks = messageHandler.handle(context);
+
+    if (context.precursor != null) {
+      for (Task<? extends Serializable> t : tasks) {
+        context.precursor.addDependentTask(t);
+        log.debug("Added {}:{} as a precursor of {}:{}",
+                context.precursor.getClass(), context.precursor.getId(), 
t.getClass(), t.getId());
+      }
+    }
+
+    inputs.addAll(messageHandler.readEntities());
+    outputs.addAll(messageHandler.writeEntities());
+    return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName), 
messageHandler.getUpdatedMetadata(), tasks);
+  }
+
+  private Task<? extends Serializable> tableUpdateReplStateTask(String dbName, 
String tableName,
+                                                    Map<String, String> 
partSpec, String replState,
+                                                    Task<? extends 
Serializable> preCursor) throws SemanticException {
+    HashMap<String, String> mapProp = new HashMap<>();
+    mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
+
+    AlterTableDesc alterTblDesc =  new AlterTableDesc(
+            AlterTableDesc.AlterTableTypes.ADDPROPS, new 
ReplicationSpec(replState, replState));
+    alterTblDesc.setProps(mapProp);
+    alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, 
tableName));
+    alterTblDesc.setPartSpec((HashMap<String, String>)partSpec);
+
+    Task<? extends Serializable> updateReplIdTask = TaskFactory.get(new 
DDLWork(inputs, outputs, alterTblDesc), conf);
+
+    // Link the update repl state task with dependency collection task
+    if (preCursor != null) {
+      preCursor.addDependentTask(updateReplIdTask);
+      log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), 
preCursor.getId(),
+              updateReplIdTask.getClass(), updateReplIdTask.getId());
+    }
+    return updateReplIdTask;
+  }
+
+  private Task<? extends Serializable> dbUpdateReplStateTask(String dbName, 
String replState,
+                                                             Task<? extends 
Serializable> preCursor) {
+    HashMap<String, String> mapProp = new HashMap<>();
+    mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
+
+    AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(dbName, mapProp, new 
ReplicationSpec(replState, replState));
+    Task<? extends Serializable> updateReplIdTask = TaskFactory.get(new 
DDLWork(inputs, outputs, alterDbDesc), conf);
+
+    // Link the update repl state task with dependency collection task
+    if (preCursor != null) {
+      preCursor.addDependentTask(updateReplIdTask);
+      log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), 
preCursor.getId(),
+              updateReplIdTask.getClass(), updateReplIdTask.getId());
+    }
+    return updateReplIdTask;
+  }
+
+  private List<Task<? extends Serializable>> addUpdateReplStateTasks(
+          boolean isDatabaseLoad,
+          UpdatedMetaDataTracker updatedMetaDataTracker,
+          List<Task<? extends Serializable>> importTasks) throws 
SemanticException {
+    // If no import tasks generated by the event then no need to update the 
repl state to any object.
+    if (importTasks.isEmpty()) {
+      log.debug("No objects need update of repl state: 0 import tasks");
+      return importTasks;
+    }
+
+    // Create a barrier task for dependency collection of import tasks
+    Task<? extends Serializable> barrierTask = TaskFactory.get(new 
DependencyCollectionWork(), conf);
+
+    List<Task<? extends Serializable>> tasks = new ArrayList<>();
+    Task<? extends Serializable> updateReplIdTask;
+
+    // If any partition is updated, then update repl state in partition object
+    for (UpdatedMetaDataTracker.UpdateMetaData updateMetaData : 
updatedMetaDataTracker.getUpdateMetaDataList()) {
+      String replState = updateMetaData.getReplState();
+      String dbName = updateMetaData.getDbName();
+      String tableName = updateMetaData.getTableName();
+      // If any partition is updated, then update repl state in partition 
object
+      for (final Map<String, String> partSpec : 
updateMetaData.getPartitionsList()) {
+        updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, 
partSpec, replState, barrierTask);
+        tasks.add(updateReplIdTask);
+      }
+
+      if (tableName != null) {
+        // If any table/partition is updated, then update repl state in table 
object
+        updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, 
replState, barrierTask);
+        tasks.add(updateReplIdTask);
+      }
+
+      // For table level load, need not update replication state for the 
database
+      if (isDatabaseLoad) {
+        // If any table/partition is updated, then update repl state in db 
object
+        updateReplIdTask = dbUpdateReplStateTask(dbName, replState, 
barrierTask);
+        tasks.add(updateReplIdTask);
+      }
+    }
+
+    if (tasks.isEmpty()) {
+      log.debug("No objects need update of repl state: 0 update tracker 
tasks");
+      return importTasks;
+    }
+
+    // Link import tasks to the barrier task which will in-turn linked with 
repl state update tasks
+    for (Task<? extends Serializable> t : importTasks){
+      t.addDependentTask(barrierTask);
+      log.debug("Added {}:{} as a precursor of barrier task {}:{}",
+               t.getClass(), t.getId(), barrierTask.getClass(), 
barrierTask.getId());
+    }
+
+    // At least one task would have been added to update the repl state
+    return tasks;
+  }
+
+  public static long getNumIteration() {
+    return numIteration;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java
new file mode 100644
index 0000000..284796f
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/AddDependencyToLeaves.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+public class AddDependencyToLeaves implements DAGTraversal.Function {
+  private List<Task<? extends Serializable>> postDependencyCollectionTasks;
+
+  public AddDependencyToLeaves(List<Task<? extends Serializable>> 
postDependencyCollectionTasks) {
+    this.postDependencyCollectionTasks = postDependencyCollectionTasks;
+  }
+
+  public AddDependencyToLeaves(Task<? extends Serializable> 
postDependencyTask) {
+    this(Collections.singletonList(postDependencyTask));
+  }
+
+
+  @Override
+  public void process(Task<? extends Serializable> task) {
+    if (task.getChildTasks() == null) {
+      postDependencyCollectionTasks.forEach(task::addDependentTask);
+    }
+  }
+
+  @Override
+  public boolean skipProcessing(Task<? extends Serializable> task) {
+    return postDependencyCollectionTasks.contains(task);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
new file mode 100644
index 0000000..00a8317
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.util;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.stats.StatsUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+
+public class ReplUtils {
+
+  public static final String REPL_CHECKPOINT_KEY = "hive.repl.ckpt.key";
+
+  /**
+   * Bootstrap REPL LOAD operation type on the examined object based on ckpt 
state.
+   */
+  public enum ReplLoadOpType {
+    LOAD_NEW, LOAD_SKIP, LOAD_REPLACE
+  }
+
+  public static Map<Integer, List<ExprNodeGenericFuncDesc>> genPartSpecs(
+          Table table, List<Map<String, String>> partitions) throws 
SemanticException {
+    Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs = new HashMap<>();
+    int partPrefixLength = 0;
+    if (partitions.size() > 0) {
+      partPrefixLength = partitions.get(0).size();
+      // pick the length of the first ptn, we expect all ptns listed to have 
the same number of
+      // key-vals.
+    }
+    List<ExprNodeGenericFuncDesc> partitionDesc = new ArrayList<>();
+    for (Map<String, String> ptn : partitions) {
+      // convert each key-value-map to appropriate expression.
+      ExprNodeGenericFuncDesc expr = null;
+      for (Map.Entry<String, String> kvp : ptn.entrySet()) {
+        String key = kvp.getKey();
+        Object val = kvp.getValue();
+        String type = table.getPartColByName(key).getType();
+        PrimitiveTypeInfo pti = TypeInfoFactory.getPrimitiveTypeInfo(type);
+        ExprNodeColumnDesc column = new ExprNodeColumnDesc(pti, key, null, 
true);
+        ExprNodeGenericFuncDesc op = DDLSemanticAnalyzer.makeBinaryPredicate(
+                "=", column, new 
ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo, val));
+        expr = (expr == null) ? op : 
DDLSemanticAnalyzer.makeBinaryPredicate("and", expr, op);
+      }
+      if (expr != null) {
+        partitionDesc.add(expr);
+      }
+    }
+    if (partitionDesc.size() > 0) {
+      partSpecs.put(partPrefixLength, partitionDesc);
+    }
+    return partSpecs;
+  }
+
+  public static Task<?> getTableReplLogTask(ImportTableDesc tableDesc, 
ReplLogger replLogger, HiveConf conf)
+          throws SemanticException {
+    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, 
tableDesc.getTableName(), tableDesc.tableType());
+    return TaskFactory.get(replLogWork, conf);
+  }
+
+  public static Task<?> getTableCheckpointTask(ImportTableDesc tableDesc, 
HashMap<String, String> partSpec,
+                                               String dumpRoot, HiveConf conf) 
throws SemanticException {
+    HashMap<String, String> mapProp = new HashMap<>();
+    mapProp.put(REPL_CHECKPOINT_KEY, dumpRoot);
+
+    AlterTableDesc alterTblDesc =  new 
AlterTableDesc(AlterTableDesc.AlterTableTypes.ADDPROPS);
+    alterTblDesc.setProps(mapProp);
+    alterTblDesc.setOldName(
+            StatsUtils.getFullyQualifiedTableName(tableDesc.getDatabaseName(), 
tableDesc.getTableName()));
+    if (partSpec != null) {
+      alterTblDesc.setPartSpec(partSpec);
+    }
+    return TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), 
alterTblDesc), conf);
+  }
+
+  public static boolean replCkptStatus(String dbName, Map<String, String> 
props, String dumpRoot)
+          throws InvalidOperationException {
+    // If ckpt property not set or empty means, bootstrap is not run on this 
object.
+    if ((props != null) && props.containsKey(REPL_CHECKPOINT_KEY) && 
!props.get(REPL_CHECKPOINT_KEY).isEmpty()) {
+      if (props.get(REPL_CHECKPOINT_KEY).equals(dumpRoot)) {
+        return true;
+      }
+      throw new 
InvalidOperationException(ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.format(dumpRoot,
+              props.get(REPL_CHECKPOINT_KEY)));
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java
new file mode 100644
index 0000000..1d01bc9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/TaskTracker.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class will be responsible to track how many tasks have been created,
+ * organization of tasks such that after the number of tasks for next 
execution are created
+ * we create a dependency collection task(DCT) -> another bootstrap task,
+ * and then add DCT as dependent to all existing tasks that are created so the 
cycle can continue.
+ */
+public class TaskTracker {
+  private static Logger LOG = LoggerFactory.getLogger(TaskTracker.class);
+  /**
+   * used to identify the list of tasks at root level for a given level like 
table /  db / partition.
+   * this does not include the task dependency notion of "table tasks < ---- 
partition task"
+   */
+  private final List<Task<? extends Serializable>> tasks = new ArrayList<>();
+  private ReplicationState replicationState = null;
+  // since tasks themselves can be graphs we want to limit the number of 
created
+  // tasks including all of dependencies.
+  private int numberOfTasks = 0;
+  private final int maxTasksAllowed;
+
+  public TaskTracker(int defaultMaxTasks) {
+    maxTasksAllowed = defaultMaxTasks;
+  }
+
+  public TaskTracker(TaskTracker existing) {
+    maxTasksAllowed = existing.maxTasksAllowed - existing.numberOfTasks;
+  }
+
+  /**
+   * this method is used to identify all the tasks in a graph.
+   * the graph however might get created in a disjoint fashion, in which case 
we can just update
+   * the number of tasks using the "update" method.
+   */
+  public void addTask(Task<? extends Serializable> task) {
+    tasks.add(task);
+
+    List <Task<? extends Serializable>> visited = new ArrayList<>();
+    updateTaskCount(task, visited);
+  }
+
+  public void addTaskList(List <Task<? extends Serializable>> taskList) {
+    List <Task<? extends Serializable>> visited = new ArrayList<>();
+    for (Task<? extends Serializable> task : taskList) {
+      if (!visited.contains(task)) {
+        tasks.add(task);
+        updateTaskCount(task, visited);
+      }
+    }
+  }
+
+  // This method is used to traverse the DAG created in tasks list and add the 
dependent task to
+  // the tail of each task chain.
+  public void addDependentTask(Task<? extends Serializable> dependent) {
+    if (tasks.isEmpty()) {
+      addTask(dependent);
+    } else {
+      DAGTraversal.traverse(tasks, new AddDependencyToLeaves(dependent));
+
+      List<Task<? extends Serializable>> visited = new ArrayList<>();
+      updateTaskCount(dependent, visited);
+    }
+  }
+
+  private void updateTaskCount(Task<? extends Serializable> task,
+                               List <Task<? extends Serializable>> visited) {
+    numberOfTasks += 1;
+    visited.add(task);
+    if (task.getChildTasks() != null) {
+      for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+        if (visited.contains(childTask)) {
+          continue;
+        }
+        updateTaskCount(childTask, visited);
+      }
+    }
+  }
+
+  public boolean canAddMoreTasks() {
+    return numberOfTasks < maxTasksAllowed;
+  }
+
+  public boolean hasTasks() {
+    return numberOfTasks != 0;
+  }
+
+  public void update(TaskTracker withAnother) {
+    numberOfTasks += withAnother.numberOfTasks;
+    if (withAnother.hasReplicationState()) {
+      this.replicationState = withAnother.replicationState;
+    }
+  }
+
+  public void setReplicationState(ReplicationState state) {
+    this.replicationState = state;
+  }
+
+  public boolean hasReplicationState() {
+    return replicationState != null;
+  }
+
+  public ReplicationState replicationState() {
+    return replicationState;
+  }
+
+  public List<Task<? extends Serializable>> tasks() {
+    return tasks;
+  }
+
+  public void debugLog(String forEventType) {
+    LOG.debug("{} event with total / root number of tasks:{}/{}", 
forEventType, numberOfTasks,
+        tasks.size());
+  }
+
+  public int numberOfTasks() {
+    return numberOfTasks;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
index a91f45e..cf54aa3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.parse.GenTezProcContext;
 import org.apache.hadoop.hive.ql.parse.GenTezWork;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 0a5ecf9..0a535d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index c1b98ae..d746cd6 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import org.antlr.runtime.tree.Tree;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -30,34 +29,18 @@ import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
-import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
+import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
-import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
-import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
-import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
-import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
-import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
-import org.apache.hadoop.hive.ql.plan.DDLWork;
-import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.stats.StatsUtils;
 
 import java.io.FileNotFoundException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
+
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -267,45 +250,6 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     }
   }
 
-  private boolean isEventNotReplayed(Map<String, String> params, FileStatus 
dir, DumpType dumpType) {
-    if (params != null && 
(params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
-      String replLastId = 
params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
-      if (Long.parseLong(replLastId) >= 
Long.parseLong(dir.getPath().getName())) {
-        LOG.debug("Event " + dumpType + " with replId " + 
Long.parseLong(dir.getPath().getName())
-                + " is already replayed. LastReplId - " +  
Long.parseLong(replLastId));
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType) throws 
SemanticException {
-    // if database itself is null then we can not filter out anything.
-    if (dbNameOrPattern == null || dbNameOrPattern.isEmpty()) {
-      return true;
-    } else if ((tblNameOrPattern == null) || (tblNameOrPattern.isEmpty())) {
-      Database database;
-      try {
-        database = Hive.get().getDatabase(dbNameOrPattern);
-        return isEventNotReplayed(database.getParameters(), dir, dumpType);
-      } catch (HiveException e) {
-        //may be the db is getting created in this load
-        LOG.debug("failed to get the database " + dbNameOrPattern);
-        return true;
-      }
-    } else {
-      Table tbl;
-      try {
-        tbl = Hive.get().getTable(dbNameOrPattern, tblNameOrPattern);
-        return isEventNotReplayed(tbl.getParameters(), dir, dumpType);
-      } catch (HiveException e) {
-        // may be the table is getting created in this load
-        LOG.debug("failed to get the table " + dbNameOrPattern + "." + 
tblNameOrPattern);
-        return true;
-      }
-    }
-  }
-
   /*
    * Example dump dirs we need to be able to handle :
    *
@@ -397,7 +341,7 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
       if ((!evDump) && (tblNameOrPattern != null) && 
!(tblNameOrPattern.isEmpty())) {
         ReplLoadWork replLoadWork = new ReplLoadWork(conf, 
loadPath.toString(), dbNameOrPattern,
-                tblNameOrPattern, queryState.getLineageState());
+                tblNameOrPattern, queryState.getLineageState(), false);
         rootTasks.add(TaskFactory.get(replLoadWork, conf));
         return;
       }
@@ -408,242 +352,15 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         return;
       }
 
-      FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, 
EximUtil.getDirectoryFilter(fs));
-
-      if ((dirsInLoadPath == null) || (dirsInLoadPath.length == 0)) {
-        throw new IllegalArgumentException("No data to load in path " + 
loadPath.toUri().toString());
-      }
-
-      if (!evDump){
-        // not an event dump, not a table dump - thus, a db dump
-        if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) {
-          LOG.debug("Found multiple dirs when we expected 1:");
-          for (FileStatus d : dirsInLoadPath) {
-            LOG.debug("> " + d.getPath().toUri().toString());
-          }
-          throw new IllegalArgumentException(
-              "Multiple dirs in "
-                  + loadPath.toUri().toString()
-                  + " does not correspond to REPL LOAD expecting to load to a 
singular destination point.");
-        }
-
-        ReplLoadWork replLoadWork = new ReplLoadWork(conf, 
loadPath.toString(), dbNameOrPattern,
-                queryState.getLineageState());
-        rootTasks.add(TaskFactory.get(replLoadWork, conf));
-        //
-        //        for (FileStatus dir : dirsInLoadPath) {
-        //          analyzeDatabaseLoad(dbNameOrPattern, fs, dir);
-        //        }
-      } else {
-        // Event dump, each sub-dir is an individual event dump.
-        // We need to guarantee that the directory listing we got is in order 
of evid.
-        Arrays.sort(dirsInLoadPath, new EventDumpDirComparator());
-
-        Task<? extends Serializable> evTaskRoot = TaskFactory.get(new 
DependencyCollectionWork());
-        Task<? extends Serializable> taskChainTail = evTaskRoot;
-
-        ReplLogger replLogger = new IncrementalLoadLogger(dbNameOrPattern,
-                loadPath.toString(), dirsInLoadPath.length);
-
-        for (FileStatus dir : dirsInLoadPath){
-          String locn = dir.getPath().toUri().toString();
-          DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf);
-
-          if (!shouldReplayEvent(dir, eventDmd.getDumpType())) {
-            continue;
-          }
-
-          LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), 
dbNameOrPattern, tblNameOrPattern);
-
-          // event loads will behave similar to table loads, with one crucial 
difference
-          // precursor order is strict, and each event must be processed after 
the previous one.
-          // The way we handle this strict order is as follows:
-          // First, we start with a taskChainTail which is a dummy noop task 
(a DependecyCollectionTask)
-          // at the head of our event chain. For each event we process, we 
tell analyzeTableLoad to
-          // create tasks that use the taskChainTail as a dependency. Then, we 
collect all those tasks
-          // and introduce a new barrier task(also a DependencyCollectionTask) 
which depends on all
-          // these tasks. Then, this barrier task becomes our new 
taskChainTail. Thus, we get a set of
-          // tasks as follows:
-          //
-          //                 --->ev1.task1--                          
--->ev2.task1--
-          //                /               \                        /         
      \
-          //  evTaskRoot-->*---->ev1.task2---*--> 
ev1.barrierTask-->*---->ev2.task2---*->evTaskChainTail
-          //                \               /
-          //                 --->ev1.task3--
-          //
-          // Once this entire chain is generated, we add evTaskRoot to 
rootTasks, so as to execute the
-          // entire chain
-
-          MessageHandler.Context context = new 
MessageHandler.Context(dbNameOrPattern,
-                                                          tblNameOrPattern, 
locn, taskChainTail,
-                                                          eventDmd, conf, db, 
ctx, LOG);
-          List<Task<? extends Serializable>> evTasks = 
analyzeEventLoad(context);
-
-          if ((evTasks != null) && (!evTasks.isEmpty())){
-            ReplStateLogWork replStateLogWork = new 
ReplStateLogWork(replLogger,
-                                                          
dir.getPath().getName(),
-                                                          
eventDmd.getDumpType().toString());
-            Task<? extends Serializable> barrierTask = 
TaskFactory.get(replStateLogWork);
-            for (Task<? extends Serializable> t : evTasks){
-              t.addDependentTask(barrierTask);
-              LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
-                  t.getClass(), t.getId(), barrierTask.getClass(), 
barrierTask.getId());
-            }
-            LOG.debug("Updated taskChainTail from {}:{} to {}:{}",
-                taskChainTail.getClass(), taskChainTail.getId(), 
barrierTask.getClass(), barrierTask.getId());
-            taskChainTail = barrierTask;
-          }
-        }
-
-        // If any event is there and db name is known, then dump the start and 
end logs
-        if (!evTaskRoot.equals(taskChainTail)) {
-          Map<String, String> dbProps = new HashMap<>();
-          dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), 
String.valueOf(dmd.getEventTo()));
-          ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, 
dbProps);
-          Task<? extends Serializable> barrierTask = 
TaskFactory.get(replStateLogWork, conf);
-          taskChainTail.addDependentTask(barrierTask);
-          LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
-                  taskChainTail.getClass(), taskChainTail.getId(),
-                  barrierTask.getClass(), barrierTask.getId());
-
-          replLogger.startLog();
-        }
-        rootTasks.add(evTaskRoot);
-      }
-
+      ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), 
dbNameOrPattern,
+              tblNameOrPattern, queryState.getLineageState(), evDump);
+      rootTasks.add(TaskFactory.get(replLoadWork, conf));
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
       throw new SemanticException(e.getMessage(), e);
     }
   }
 
-  private List<Task<? extends Serializable>> analyzeEventLoad(
-          MessageHandler.Context context)
-      throws SemanticException {
-    MessageHandler messageHandler = context.dmd.getDumpType().handler();
-    List<Task<? extends Serializable>> tasks = messageHandler.handle(context);
-
-    if (context.precursor != null) {
-      for (Task<? extends Serializable> t : tasks) {
-        context.precursor.addDependentTask(t);
-        LOG.debug("Added {}:{} as a precursor of {}:{}",
-                context.precursor.getClass(), context.precursor.getId(), 
t.getClass(), t.getId());
-      }
-    }
-
-    inputs.addAll(messageHandler.readEntities());
-    outputs.addAll(messageHandler.writeEntities());
-    return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName),
-                            messageHandler.getUpdatedMetadata(), tasks);
-  }
-
-  private Task<? extends Serializable> tableUpdateReplStateTask(
-                                                        String dbName,
-                                                        String tableName,
-                                                        Map<String, String> 
partSpec,
-                                                        String replState,
-                                                        Task<? extends 
Serializable> preCursor) throws SemanticException {
-    HashMap<String, String> mapProp = new HashMap<>();
-    mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
-
-    AlterTableDesc alterTblDesc =  new AlterTableDesc(
-            AlterTableDesc.AlterTableTypes.ADDPROPS, new 
ReplicationSpec(replState, replState));
-    alterTblDesc.setProps(mapProp);
-    alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, 
tableName));
-    alterTblDesc.setPartSpec((HashMap<String, String>)partSpec);
-
-    Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
-        new DDLWork(inputs, outputs, alterTblDesc), conf);
-
-    // Link the update repl state task with dependency collection task
-    if (preCursor != null) {
-      preCursor.addDependentTask(updateReplIdTask);
-      LOG.debug("Added {}:{} as a precursor of {}:{}",
-              preCursor.getClass(), preCursor.getId(),
-              updateReplIdTask.getClass(), updateReplIdTask.getId());
-    }
-    return updateReplIdTask;
-  }
-
-  private Task<? extends Serializable> dbUpdateReplStateTask(
-                                                        String dbName,
-                                                        String replState,
-                                                        Task<? extends 
Serializable> preCursor) {
-    HashMap<String, String> mapProp = new HashMap<>();
-    mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
-
-    AlterDatabaseDesc alterDbDesc = new AlterDatabaseDesc(
-                            dbName, mapProp, new ReplicationSpec(replState, 
replState));
-    Task<? extends Serializable> updateReplIdTask = TaskFactory.get(
-        new DDLWork(inputs, outputs, alterDbDesc), conf);
-
-    // Link the update repl state task with dependency collection task
-    if (preCursor != null) {
-      preCursor.addDependentTask(updateReplIdTask);
-      LOG.debug("Added {}:{} as a precursor of {}:{}",
-              preCursor.getClass(), preCursor.getId(),
-              updateReplIdTask.getClass(), updateReplIdTask.getId());
-    }
-    return updateReplIdTask;
-  }
-
-  private List<Task<? extends Serializable>> addUpdateReplStateTasks(
-          boolean isDatabaseLoad,
-          UpdatedMetaDataTracker updatedMetaDataTracker,
-          List<Task<? extends Serializable>> importTasks) throws 
SemanticException {
-    // If no import tasks generated by the event then no need to update the 
repl state to any object.
-    if (importTasks.isEmpty()) {
-      LOG.debug("No objects need update of repl state: 0 import tasks");
-      return importTasks;
-    }
-
-    // Create a barrier task for dependency collection of import tasks
-    Task<? extends Serializable> barrierTask = TaskFactory.get(new 
DependencyCollectionWork(), conf);
-
-    List<Task<? extends Serializable>> tasks = new ArrayList<>();
-    Task<? extends Serializable> updateReplIdTask;
-
-    // If any partition is updated, then update repl state in partition object
-    for (UpdatedMetaDataTracker.UpdateMetaData updateMetaData : 
updatedMetaDataTracker.getUpdateMetaDataList()) {
-      String replState = updateMetaData.getReplState();
-      String dbName = updateMetaData.getDbName();
-      String tableName = updateMetaData.getTableName();
-      // If any partition is updated, then update repl state in partition 
object
-      for (final Map<String, String> partSpec : 
updateMetaData.getPartitionsList()) {
-        updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, 
partSpec, replState, barrierTask);
-        tasks.add(updateReplIdTask);
-      }
-
-      if (tableName != null) {
-        // If any table/partition is updated, then update repl state in table 
object
-        updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, 
replState, barrierTask);
-        tasks.add(updateReplIdTask);
-      }
-
-      // For table level load, need not update replication state for the 
database
-      if (isDatabaseLoad) {
-        // If any table/partition is updated, then update repl state in db 
object
-        updateReplIdTask = dbUpdateReplStateTask(dbName, replState, 
barrierTask);
-        tasks.add(updateReplIdTask);
-      }
-    }
-
-    if (tasks.isEmpty()) {
-      LOG.debug("No objects need update of repl state: 0 update tracker 
tasks");
-      return importTasks;
-    }
-
-    // Link import tasks to the barrier task which will in-turn linked with 
repl state update tasks
-    for (Task<? extends Serializable> t : importTasks){
-      t.addDependentTask(barrierTask);
-      LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
-              t.getClass(), t.getId(), barrierTask.getClass(), 
barrierTask.getId());
-    }
-
-    // At least one task would have been added to update the repl state
-    return tasks;
-  }
-
   // REPL STATUS
   private void initReplStatus(ASTNode ast) throws SemanticException{
     dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
index 9fdf742..ecde3ce 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/PartitionSerializer.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.io;
 
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.thrift.TException;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
index 70f4fed..f05c231 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/TableSerializer.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
index b59cdf2..e68e055 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AlterDatabaseHandler.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
index 87a6ff6..0619bd3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl.load.message;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.ReplTxnWork;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
index 939884d..4a2fdd2 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/DropPartitionHandler.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl.load.message;
 import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.repl.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DDLWork;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java
index 309debe..166cf87 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/AddDependencyToLeavesTest.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/hive/blob/150ef3ba/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java
index 32b4a72..41ab447 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TestTaskTracker.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
 
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;

Reply via email to