HIVE-17426 : Execution framework in hive to run tasks in parallel (Anishek 
Agarwal, reviwed by Daniel Dai, Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/39d8d73e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/39d8d73e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/39d8d73e

Branch: refs/heads/master
Commit: 39d8d73e7b7584a48f46ad6b455da6b87e80ea51
Parents: 37be57b
Author: Thejas M Nair <[email protected]>
Authored: Fri Sep 22 20:16:49 2017 -0700
Committer: Thejas M Nair <[email protected]>
Committed: Fri Sep 22 20:16:58 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   | 10 ++--
 ...TestReplicationScenariosAcrossInstances.java | 29 +++++++++
 .../java/org/apache/hadoop/hive/ql/Driver.java  | 15 ++---
 .../apache/hadoop/hive/ql/DriverContext.java    |  2 +-
 .../hadoop/hive/ql/exec/ColumnStatsTask.java    |  3 +-
 .../hive/ql/exec/ColumnStatsUpdateTask.java     |  3 +-
 .../hadoop/hive/ql/exec/ConditionalTask.java    |  7 ++-
 .../apache/hadoop/hive/ql/exec/CopyTask.java    |  1 -
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |  9 +++
 .../hive/ql/exec/ExplainSQRewriteTask.java      |  5 ++
 .../apache/hadoop/hive/ql/exec/ExplainTask.java |  5 ++
 .../apache/hadoop/hive/ql/exec/FetchTask.java   |  4 ++
 .../hadoop/hive/ql/exec/FunctionTask.java       |  8 +++
 .../apache/hadoop/hive/ql/exec/MoveTask.java    | 32 +++++-----
 .../apache/hadoop/hive/ql/exec/NodeUtils.java   |  7 ++-
 .../org/apache/hadoop/hive/ql/exec/Task.java    | 61 +++++++++++--------
 .../hadoop/hive/ql/exec/mr/MapRedTask.java      |  2 -
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |  5 ++
 .../ql/exec/repl/bootstrap/ReplLoadTask.java    |  8 ++-
 .../ql/exec/repl/bootstrap/ReplLoadWork.java    | 20 +++++--
 .../bootstrap/load/table/LoadPartitions.java    |  6 +-
 .../repl/bootstrap/load/table/LoadTable.java    |  6 +-
 .../exec/repl/bootstrap/load/util/Context.java  | 15 ++++-
 .../hadoop/hive/ql/hooks/LineageInfo.java       |  2 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |  3 +-
 .../hadoop/hive/ql/io/merge/MergeFileTask.java  |  1 -
 .../hive/ql/optimizer/GenMapRedUtils.java       |  6 +-
 .../hive/ql/optimizer/lineage/LineageCtx.java   |  3 +-
 .../hive/ql/parse/DDLSemanticAnalyzer.java      | 10 ++--
 .../hive/ql/parse/ImportSemanticAnalyzer.java   | 34 ++++++-----
 .../hive/ql/parse/LoadSemanticAnalyzer.java     | 10 ++--
 .../ql/parse/ReplicationSemanticAnalyzer.java   |  7 ++-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  | 10 +++-
 .../hadoop/hive/ql/parse/TaskCompiler.java      | 62 ++++++++------------
 .../hive/ql/plan/ColumnStatsUpdateWork.java     | 12 +++-
 .../hadoop/hive/ql/plan/ColumnStatsWork.java    | 11 +++-
 .../apache/hadoop/hive/ql/plan/LoadDesc.java    |  2 +-
 .../hadoop/hive/ql/plan/LoadTableDesc.java      | 49 ++++++++++------
 .../apache/hadoop/hive/ql/plan/MoveWork.java    | 25 ++++++--
 .../hadoop/hive/ql/session/LineageState.java    | 26 ++++----
 ...TestGenMapRedUtilsCreateConditionalTask.java | 62 ++++++++++++--------
 41 files changed, 381 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1ba5968..00d2e46 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -442,11 +442,11 @@ public class HiveConf extends Configuration {
         "Inteval for cmroot cleanup thread."),
     
REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/hive/repl/functions/",
         "Root directory on the replica warehouse where the repl sub-system 
will store jars from the primary warehouse"),
-    REPL_APPROX_MAX_LOAD_TASKS("hive.repl.approx.max.load.tasks", 1000,
-        "Provide and approximate of the max number of tasks that should be 
executed in before  \n" +
-            "dynamically generating the next set of tasks. The number is an 
approximate as we \n" +
-            "will stop at slightly higher number than above, the reason being 
some events might \n" +
-            "lead to an task increment that would cross the above limit"),
+    REPL_APPROX_MAX_LOAD_TASKS("hive.repl.approx.max.load.tasks", 10000,
+        "Provide an approximation of the maximum number of tasks that should 
be executed before \n"
+            + "dynamically generating the next set of tasks. The number is 
approximate as Hive \n"
+            + "will stop at a slightly higher number, the reason being some 
events might lead to a \n"
+            + "task increment that would cross the specified limit."),
     REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",5,
         "Number of threads that will be used to dump partition data 
information during repl dump."),
     REPL_DUMPDIR_CLEAN_FREQ("hive.repl.dumpdir.clean.freq", "0s",

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 3dee0d2..50c4a98 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -248,4 +248,33 @@ public class TestReplicationScenariosAcrossInstances {
         .run("select country from t2 order by country")
         .verifyResults(new String[] { "france", "india", "us" });
   }
+
+  @Test
+  public void parallelExecutionOfReplicationBootStrapLoad() throws Throwable {
+    WarehouseInstance.Tuple tuple = primary
+        .run("use " + primaryDbName)
+        .run("create table t1 (id int)")
+        .run("create table t2 (place string) partitioned by (country string)")
+        .run("insert into table t2 partition(country='india') values 
('bangalore')")
+        .run("insert into table t2 partition(country='australia') values 
('sydney')")
+        .run("insert into table t2 partition(country='russia') values 
('moscow')")
+        .run("insert into table t2 partition(country='uk') values ('london')")
+        .run("insert into table t2 partition(country='us') values ('sfo')")
+        .run("insert into table t2 partition(country='france') values 
('paris')")
+        .run("insert into table t2 partition(country='japan') values 
('tokyo')")
+        .run("insert into table t2 partition(country='china') values ('hkg')")
+        .run("create table t3 (rank int)")
+        .dump(primaryDbName, null);
+
+    replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, true);
+    replica.load(replicatedDbName, tuple.dumpLocation)
+        .run("use " + replicatedDbName)
+        .run("repl status " + replicatedDbName)
+        .verifyResult(tuple.lastReplicationId)
+        .run("show tables")
+        .verifyResults(new String[] { "t1", "t2", "t3" })
+        .run("select country from t2")
+        .verifyResults(Arrays.asList("india", "australia", "russia", "uk", 
"us", "france", "japan",
+            "china"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 6616cba..e802d42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -39,7 +39,6 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
-
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
@@ -90,6 +89,7 @@ import 
org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
+import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
 import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook;
 import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
 import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
@@ -100,7 +100,6 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
-import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -108,11 +107,11 @@ import 
org.apache.hadoop.hive.ql.processors.CommandProcessor;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.security.authorization.AuthorizationUtils;
 import 
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivObjectActionType;
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType;
-import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde2.ByteStream;
@@ -121,9 +120,7 @@ import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-
 import org.apache.hive.common.util.ShutdownHookManager;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1809,9 +1806,9 @@ public class Driver implements CommandProcessor {
       ctx.setHDFSCleanup(true);
       this.driverCxt = driverCxt; // for canceling the query (should be bound 
to session?)
 
-      SessionState.get().setMapRedStats(new LinkedHashMap<String, 
MapRedStats>());
-      SessionState.get().setStackTraces(new HashMap<String, 
List<List<String>>>());
-      SessionState.get().setLocalMapRedErrors(new HashMap<String, 
List<String>>());
+      SessionState.get().setMapRedStats(new LinkedHashMap<>());
+      SessionState.get().setStackTraces(new HashMap<>());
+      SessionState.get().setLocalMapRedErrors(new HashMap<>());
 
       // Add root Tasks to runnable
       for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
@@ -2180,7 +2177,7 @@ public class Driver implements CommandProcessor {
 
     cxt.launching(tskRun);
     // Launch Task
-    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && 
tsk.isMapRedTask()) {
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && 
tsk.canExecuteInParallel()) {
       // Launch it in the parallel mode, as a separate thread only for MR tasks
       if (LOG.isInfoEnabled()){
         LOG.info("Starting task [" + tsk + "] in parallel");

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
index f43992c..583d3d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
@@ -160,7 +160,7 @@ public class DriverContext {
   public static boolean isLaunchable(Task<? extends Serializable> tsk) {
     // A launchable task is one that hasn't been queued, hasn't been
     // initialized, and is runnable.
-    return !tsk.getQueued() && !tsk.getInitialized() && tsk.isRunnable();
+    return tsk.isNotInitialized() && tsk.isRunnable();
   }
 
   public synchronized boolean addToRunnable(Task<? extends Serializable> tsk) 
throws HiveException {

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
index 2b2c004..e3b0d1e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -330,7 +329,7 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> 
implements Serializab
   private List<ColumnStatistics> constructColumnStatsFromPackedRows(
       Hive db) throws HiveException, MetaException, IOException {
 
-    String currentDb = SessionState.get().getCurrentDatabase();
+    String currentDb = work.getCurrentDatabaseName();
     String tableName = work.getColStats().getTableName();
     String partName = null;
     List<String> colName = work.getColStats().getColName();

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
index 82fbf28..48a9c9a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,7 +77,7 @@ public class ColumnStatsUpdateTask extends 
Task<ColumnStatsUpdateWork> {
   private ColumnStatistics constructColumnStatsFromInput()
       throws SemanticException, MetaException {
 
-    String dbName = SessionState.get().getCurrentDatabase();
+    String dbName = work.getCurrentDatabaseName();
     ColumnStatsDesc desc = work.getColStats();
     String tableName = desc.getTableName();
     String partName = work.getPartName();

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
index 52cb445..a51e69d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
@@ -21,9 +21,7 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.Serializable;
 import java.util.List;
 
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
 import org.apache.hadoop.hive.ql.plan.ConditionalWork;
@@ -61,6 +59,11 @@ public class ConditionalTask extends Task<ConditionalWork> 
implements Serializab
   }
 
   @Override
+  public boolean canExecuteInParallel() {
+    return isMapRedTask();
+  }
+
+  @Override
   public boolean hasReduce() {
     for (Task<? extends Serializable> task : listTasks) {
       if (task.hasReduce()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
index 2683f29..2e06ad0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.plan.CopyWork;

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index ad5b3a3..443be54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4941,4 +4941,13 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
     }
     return retval;
   }
+
+  /*
+  uses the authorizer from SessionState will need some more work to get this 
to run in parallel,
+  however this should not be a bottle neck so might not need to parallelize 
this.
+   */
+  @Override
+  public boolean canExecuteInParallel() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java
index 6fffab0..ec5ebb0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java
@@ -202,4 +202,9 @@ public class ExplainSQRewriteTask extends 
Task<ExplainSQRewriteWork> implements
     colList.add(tmpFieldSchema);
     return colList;
   }
+
+  @Override
+  public boolean canExecuteInParallel() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
index c25a783..da76c3c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
@@ -1230,4 +1230,9 @@ public class ExplainTask extends Task<ExplainWork> 
implements Serializable {
     colList.add(tmpFieldSchema);
     return colList;
   }
+
+  @Override
+  public boolean canExecuteInParallel() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index e708d58..bde052b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -193,4 +193,8 @@ public class FetchTask extends Task<FetchWork> implements 
Serializable {
     }
   }
 
+  @Override
+  public boolean canExecuteInParallel() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
index 0f990e6..bb0fff4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
@@ -357,4 +357,12 @@ public class FunctionTask extends Task<FunctionWork> {
   public String getName() {
     return "FUNCTION";
   }
+
+  /**
+   * this needs access to session state resource downloads which in turn uses 
references to Registry objects.
+   */
+  @Override
+  public boolean canExecuteInParallel() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index a1e4f96..c8ad795 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -18,22 +18,12 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.io.HdfsUtils;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -68,11 +58,19 @@ import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
 /**
  * MoveTask implementation.
  **/
@@ -458,7 +456,7 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
                 dpCtx.getNumDPCols(),
                 isSkewedStoredAsDirs(tbd),
                 work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID,
-                SessionState.get().getTxnMgr().getCurrentTxnId(), 
hasFollowingStatsTask(),
+                work.getLoadTableWork().getCurrentTransactionId(), 
hasFollowingStatsTask(),
                 work.getLoadTableWork().getWriteType());
 
             // publish DP columns to its subscribers
@@ -506,10 +504,10 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
               dc = new DataContainer(table.getTTable(), partn.getTPartition());
 
               // Don't set lineage on delete as we don't have all the columns
-              if (SessionState.get() != null &&
+              if (work.getLineagState() != null &&
                   work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.DELETE &&
                   work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.UPDATE) {
-                
SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
+                work.getLineagState().setLineage(tbd.getSourcePath(), dc,
                     table.getCols());
               }
               LOG.info("\tLoading partition " + entry.getKey());
@@ -540,7 +538,7 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
             }
          }
         }
-        if (SessionState.get() != null && dc != null) {
+        if (work.getLineagState() != null && dc != null) {
           // If we are doing an update or a delete the number of columns in 
the table will not
           // match the number of columns in the file sink.  For update there 
will be one too many
           // (because of the ROW__ID), and in the case of the delete there 
will be just the
@@ -551,14 +549,14 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
             case UPDATE:
               // Pass an empty list as no columns will be written to the file.
               // TODO I should be able to make this work for update
-              tableCols = new ArrayList<FieldSchema>();
+              tableCols = new ArrayList<>();
               break;
 
             default:
               tableCols = table.getCols();
               break;
           }
-          SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), 
dc, tableCols);
+          work.getLineagState().setLineage(tbd.getSourcePath(), dc, tableCols);
         }
         releaseLocks(tbd);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java
index 249ffb3..a9f9fe2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec;
 
 import org.apache.hadoop.hive.ql.lib.Node;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -33,7 +34,7 @@ import java.util.Set;
 public class NodeUtils {
 
 
-  public static <T> void iterateTask(Collection<Task<?>> tasks, Class<T> 
clazz, Function<T> function) {
+  public static <T> void iterateTask(Collection<Task<? extends Serializable>> 
tasks, Class<T> clazz, Function<T> function) {
     // Does a breadth first traversal of the tasks
     Set<Task> visited = new HashSet<Task>();
     while (!tasks.isEmpty()) {
@@ -42,7 +43,7 @@ public class NodeUtils {
     return;
   }
 
-  private static <T> Collection<Task<?>> iterateTask(Collection<Task<?>> tasks,
+  private static <T> Collection<Task<? extends Serializable>> 
iterateTask(Collection<Task<?>> tasks,
                                                      Class<T> clazz,
                                                      Function<T> function,
                                                      Set<Task> visited) {
@@ -92,7 +93,7 @@ public class NodeUtils {
     return childListNodes;
   }
 
-  public static interface Function<T> {
+  public interface Function<T> {
     void apply(T argument);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index e1bd291..6193b90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -18,17 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
-import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.DriverContext;
@@ -47,6 +37,15 @@ import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
 /**
  * Task implementation.
  **/
@@ -113,7 +112,17 @@ public abstract class Task<T extends Serializable> 
implements Serializable, Node
   public enum FeedType {
     DYNAMIC_PARTITIONS, // list of dynamic partitions
   }
+
+  /**
+   * Order of the States here is important as the ordinal values are used
+   * determine the progression of taskState over its lifeCycle which is then
+   * used to make some decisions in Driver.execute
+   */
   public enum TaskState {
+    // Task state is unkown
+    UNKNOWN,
+    // Task is just created
+    CREATED,
     // Task data structures have been initialized
     INITIALIZED,
     // Task has been queued for execution by the driver
@@ -121,11 +130,7 @@ public abstract class Task<T extends Serializable> 
implements Serializable, Node
     // Task is currently running
     RUNNING,
     // Task has completed
-    FINISHED,
-    // Task is just created
-    CREATED,
-    // Task state is unkown
-    UNKNOWN
+    FINISHED
   }
 
   // Bean methods
@@ -366,38 +371,44 @@ public abstract class Task<T extends Serializable> 
implements Serializable, Node
       }
     }
   }
-  public void setStarted() {
+
+  public synchronized void setStarted() {
     setState(TaskState.RUNNING);
   }
 
-  public boolean started() {
+  public synchronized boolean started() {
     return taskState == TaskState.RUNNING;
   }
 
-  public boolean done() {
+  public synchronized boolean done() {
     return taskState == TaskState.FINISHED;
   }
 
-  public void setDone() {
+  public synchronized void setDone() {
     setState(TaskState.FINISHED);
   }
 
-  public void setQueued() {
+  public synchronized void setQueued() {
     setState(TaskState.QUEUED);
   }
 
-  public boolean getQueued() {
+  public synchronized boolean getQueued() {
     return taskState == TaskState.QUEUED;
   }
 
-  public void setInitialized() {
+  public synchronized void setInitialized() {
     setState(TaskState.INITIALIZED);
   }
 
-  public boolean getInitialized() {
+  public synchronized boolean getInitialized() {
     return taskState == TaskState.INITIALIZED;
   }
 
+  public synchronized boolean isNotInitialized() {
+    return taskState.ordinal() < TaskState.INITIALIZED.ordinal();
+  }
+
+
   public boolean isRunnable() {
     boolean isrunnable = true;
     if (parentTasks != null) {
@@ -630,5 +641,7 @@ public abstract class Task<T extends Serializable> 
implements Serializable, Node
     return toString().equals(String.valueOf(obj));
   }
 
-
+  public boolean canExecuteInParallel(){
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
index 1bd4db7..41a1ef1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hive.common.io.CachingPrintStream;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -49,7 +48,6 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.ResourceType;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.StreamPrinter;
 
 /**

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 3cae543..f9991d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -356,4 +356,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
   public StageType getType() {
     return StageType.REPL_DUMP;
   }
+
+  @Override
+  public boolean canExecuteInParallel() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
index ca2e992..4d8d06a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
@@ -70,7 +70,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
   protected int execute(DriverContext driverContext) {
     try {
       int maxTasks = 
conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
-      Context context = new Context(conf, getHive());
+      Context context = new Context(conf, getHive(), 
work.sessionStateLineageState,
+          work.currentTransactionId);
       TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
       /*
           for now for simplicity we are doing just one directory ( one 
database ), come back to use
@@ -206,6 +207,11 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
         work.updateDbEventState(null);
       }
       this.childTasks = scope.rootTasks;
+      /*
+      Since there can be multiple rounds of this run all of which will be tied 
to the same
+      query id -- generated in compile phase , adding a additional UUID to the 
end to print each run
+      in separate files.
+       */
       LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), 
loadTaskTracker.numberOfTasks());
 
       // Populate the driver context with the scratch dir info from the repl 
context, so that the temp dirs will be cleaned up later

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
index a8e9067..c4ca121 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
@@ -22,6 +22,7 @@ import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
 import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
 import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
 import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.session.LineageState;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -37,17 +38,28 @@ public class ReplLoadWork implements Serializable {
   private int loadTaskRunCount = 0;
   private DatabaseEvent.State state = null;
 
+  /*
+  these are sessionState objects that are copied over to work to allow for 
parallel execution.
+  based on the current use case the methods are selectively synchronized, 
which might need to be
+  taken care when using other methods.
+  */
+  final LineageState sessionStateLineageState;
+  public final long currentTransactionId;
+
   public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String 
dbNameToLoadIn,
-      String tableNameToLoadIn) throws IOException {
+      String tableNameToLoadIn, LineageState lineageState, long 
currentTransactionId)
+      throws IOException {
     this.tableNameToLoadIn = tableNameToLoadIn;
+    sessionStateLineageState = lineageState;
+    this.currentTransactionId = currentTransactionId;
     this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, 
hiveConf);
     this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, 
hiveConf);
     this.dbNameToLoadIn = dbNameToLoadIn;
   }
 
-  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String 
dbNameOrPattern)
-      throws IOException {
-    this(hiveConf, dumpDirectory, dbNameOrPattern, null);
+  public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String 
dbNameOrPattern,
+      LineageState lineageState, long currentTransactionId) throws IOException 
{
+    this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, 
currentTransactionId);
   }
 
   public BootstrapEventsIterator iterator() {

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index cef07ad..5c6ef9f 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -238,10 +238,10 @@ public class LoadPartitions {
       Path tmpPath) {
     LoadTableDesc loadTableWork = new LoadTableDesc(
         tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
-        event.replicationSpec().isReplace()
-    );
+        event.replicationSpec().isReplace());
     loadTableWork.setInheritTableSpecs(false);
-    MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), 
loadTableWork, null, false);
+    MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), 
loadTableWork, null, false,
+        context.sessionStateLineageState);
     return TaskFactory.get(work, context.hiveConf);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 2bf3784..a9a9162 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -224,10 +224,10 @@ public class LoadTable {
         ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, 
context.hiveConf);
 
     LoadTableDesc loadTableWork = new LoadTableDesc(
-        tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), 
replicationSpec.isReplace()
-    );
+        tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), 
replicationSpec.isReplace());
     MoveWork moveWork =
-        new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, 
false);
+        new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, 
false,
+            context.sessionStateLineageState);
     Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf);
     copyTask.addDependentTask(loadTableTask);
     return copyTask;

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
index 8948b0c..5e3e31e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.session.LineageState;
 
 public class Context {
   public final HiveConf hiveConf;
@@ -28,10 +29,22 @@ public class Context {
   public final Warehouse warehouse;
   public final PathInfo pathInfo;
 
-  public Context(HiveConf hiveConf, Hive hiveDb) throws MetaException {
+  /*
+  these are sessionState objects that are copied over to work to allow for 
parallel execution.
+  based on the current use case the methods are selectively synchronized, 
which might need to be
+  taken care when using other methods.
+ */
+  public final LineageState sessionStateLineageState;
+  public final long currentTransactionId;
+
+
+  public Context(HiveConf hiveConf, Hive hiveDb,
+      LineageState lineageState, long currentTransactionId) throws 
MetaException {
     this.hiveConf = hiveConf;
     this.hiveDb = hiveDb;
     this.warehouse = new Warehouse(hiveConf);
     this.pathInfo = new PathInfo(hiveConf);
+    sessionStateLineageState = lineageState;
+    this.currentTransactionId = currentTransactionId;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java 
b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
index 7305436..05b7d71 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java
@@ -430,7 +430,7 @@ public class LineageInfo implements Serializable {
   /**
    * This class tracks the predicate information for an operator.
    */
-  public static class Predicate {
+  public static class Predicate implements Serializable {
 
     /**
      * Expression string for the predicate.

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index c64bc8c..89fbc14 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -317,7 +318,7 @@ public class AcidUtils {
     return result;
   }
 
-  public enum Operation {
+  public enum Operation implements Serializable {
     NOT_ACID, INSERT, UPDATE, DELETE;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
index da99c23..0e86aac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.FileInputFormat;

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index da153e3..25cca7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -1222,7 +1223,6 @@ public final class GenMapRedUtils {
 
   /**
    * @param fsInput The FileSink operator.
-   * @param ctx The MR processing context.
    * @param finalName the final destination path the merge job should output.
    * @param dependencyTask
    * @param mvTasks
@@ -1309,8 +1309,10 @@ public final class GenMapRedUtils {
     //
     // 2. Constructing a conditional task consisting of a move task and a map 
reduce task
     //
+    HiveTxnManager txnMgr = SessionState.get().getTxnMgr();
     MoveWork dummyMv = new MoveWork(null, null, null,
-         new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, 
null, null), false);
+         new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null,
+             null), false, SessionState.get().getLineageState());
     MapWork cplan;
     Serializable work;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
index 3c20532..5557138 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.optimizer.lineage;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -47,7 +48,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
  */
 public class LineageCtx implements NodeProcessorCtx {
 
-  public static class Index {
+  public static class Index implements Serializable {
 
     /**
      * The map contains an index from the (operator, columnInfo) to the

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index 251deca..2c55011 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@ -592,7 +592,7 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tbl.getDbName() + "." + 
tbl.getTableName(),
         Arrays.asList(colName), Arrays.asList(colType), partSpec == null);
     ColumnStatsUpdateTask cStatsUpdateTask = (ColumnStatsUpdateTask) 
TaskFactory
-        .get(new ColumnStatsUpdateWork(cStatsDesc, partName, mapProp), conf);
+        .get(new ColumnStatsUpdateWork(cStatsDesc, partName, mapProp, 
SessionState.get().getCurrentDatabase()), conf);
     rootTasks.add(cStatsUpdateTask);
   }
 
@@ -1083,9 +1083,10 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc);
         truncateTblDesc.setOutputDir(queryTmpdir);
         LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
-            partSpec == null ? new HashMap<String, String>() : partSpec);
+            partSpec == null ? new HashMap<>() : partSpec);
         ltd.setLbCtx(lbCtx);
-        Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, 
null, false),
+        Task<MoveWork> moveTsk = TaskFactory
+            .get(new MoveWork(null, null, ltd, null, false, 
SessionState.get().getLineageState()),
             conf);
         truncateTask.addDependentTask(moveTsk);
 
@@ -1719,7 +1720,8 @@ public class DDLSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
           partSpec == null ? new HashMap<>() : partSpec);
       ltd.setLbCtx(lbCtx);
-      Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, 
null, false),
+      Task<MoveWork> moveTsk = TaskFactory
+          .get(new MoveWork(null, null, ltd, null, false, 
SessionState.get().getLineageState()),
           conf);
       mergeTask.addDependentTask(moveTsk);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 89c4e2c..751bda0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -18,19 +18,6 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
 import org.antlr.runtime.tree.Tree;
 import org.apache.commons.lang.ObjectUtils;
 import org.apache.hadoop.fs.FileStatus;
@@ -70,6 +57,19 @@ import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.slf4j.Logger;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
 /**
  * ImportSemanticAnalyzer.
  *
@@ -349,10 +349,11 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath);
     Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, 
tmpPath, x.getConf());
     LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
-        Utilities.getTableDesc(table), new TreeMap<String, String>(),
+        Utilities.getTableDesc(table), new TreeMap<>(),
         replace);
     Task<?> loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(),
-        x.getOutputs(), loadTableWork, null, false), x.getConf());
+            x.getOutputs(), loadTableWork, null, false, 
SessionState.get().getLineageState()),
+        x.getConf());
     copyTask.addDependentTask(loadTableTask);
     x.getTasks().add(copyTask);
     return loadTableTask;
@@ -422,7 +423,8 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           partSpec.getPartSpec(), replicationSpec.isReplace());
       loadTableWork.setInheritTableSpecs(false);
       Task<?> loadPartTask = TaskFactory.get(new MoveWork(
-          x.getInputs(), x.getOutputs(), loadTableWork, null, false),
+              x.getInputs(), x.getOutputs(), loadTableWork, null, false,
+              SessionState.get().getLineageState()),
           x.getConf());
       copyTask.addDependentTask(loadPartTask);
       addPartTask.addDependentTask(loadPartTask);

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index fa79700..8879b80 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.parse;
 
 import org.apache.hadoop.hive.conf.HiveConf.StrictChecks;
 
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
@@ -54,10 +52,10 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.InputFormat;
 
 import com.google.common.collect.Lists;
-import org.apache.orc.impl.OrcAcidUtils;
 
 /**
  * LoadSemanticAnalyzer.
@@ -285,8 +283,10 @@ public class LoadSemanticAnalyzer extends 
BaseSemanticAnalyzer {
       loadTableWork.setInheritTableSpecs(false);
     }
 
-    Task<? extends Serializable> childTask = TaskFactory.get(new 
MoveWork(getInputs(),
-        getOutputs(), loadTableWork, null, true, isLocal), conf);
+    Task<? extends Serializable> childTask = TaskFactory.get(
+        new MoveWork(getInputs(), getOutputs(), loadTableWork, null, true,
+            isLocal, SessionState.get().getLineageState()), conf
+    );
     if (rTask != null) {
       rTask.addDependentTask(childTask);
     } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index fd97baa..e7e7f9b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.session.SessionState;
 
 import java.io.FileNotFoundException;
 import java.io.Serializable;
@@ -286,7 +287,8 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
       if ((!evDump) && (tblNameOrPattern != null) && 
!(tblNameOrPattern.isEmpty())) {
         ReplLoadWork replLoadWork =
-            new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, 
tblNameOrPattern);
+            new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, 
tblNameOrPattern,
+                SessionState.get().getLineageState(), 
SessionState.get().getTxnMgr().getCurrentTxnId());
         rootTasks.add(TaskFactory.get(replLoadWork, conf));
         return;
       }
@@ -316,7 +318,8 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
                   + " does not correspond to REPL LOAD expecting to load to a 
singular destination point.");
         }
 
-        ReplLoadWork replLoadWork = new ReplLoadWork(conf, 
loadPath.toString(), dbNameOrPattern);
+        ReplLoadWork replLoadWork = new ReplLoadWork(conf, 
loadPath.toString(), dbNameOrPattern,
+            SessionState.get().getLineageState(), 
SessionState.get().getTxnMgr().getCurrentTxnId());
         rootTasks.add(TaskFactory.get(replLoadWork, conf));
         //
         //        for (FileStatus dir : dirsInLoadPath) {

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index d56fd21..4814fcd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6902,7 +6902,10 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
           acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
           checkAcidConstraints(qb, table_desc, dest_tab);
         }
-        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp);
+        Long currentTransactionId = acidOp == Operation.NOT_ACID ? null :
+            SessionState.get().getTxnMgr().getCurrentTxnId();
+        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp,
+            currentTransactionId);
         // For Acid table, Insert Overwrite shouldn't replace the table 
content. We keep the old
         // deltas and base and leave them up to the cleaner to clean up
         
ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
@@ -7017,7 +7020,10 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
         checkAcidConstraints(qb, table_desc, dest_tab);
       }
-      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), 
acidOp);
+      Long currentTransactionId = (acidOp == Operation.NOT_ACID) ? null :
+          SessionState.get().getTxnMgr().getCurrentTxnId();
+      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), 
acidOp,
+          currentTransactionId);
       // For Acid table, Insert Overwrite shouldn't replace the table content. 
We keep the old
       // deltas and base and leave them up to the cleaner to clean up
       ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 08a8f00..f0089fc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -18,20 +18,8 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.Set;
-import java.util.Stack;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.collect.Interner;
+import com.google.common.collect.Interners;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -42,7 +30,6 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.ColumnStatsTask;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.StatsTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -54,7 +41,6 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
-import 
org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer;
 import 
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
@@ -76,9 +62,16 @@ import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter;
 import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Interner;
-import com.google.common.collect.Interners;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
 
 /**
  * TaskCompiler is a the base class for classes that compile
@@ -107,7 +100,7 @@ public abstract class TaskCompiler {
 
     Context ctx = pCtx.getContext();
     GlobalLimitCtx globalLimitCtx = pCtx.getGlobalLimitCtx();
-    List<Task<MoveWork>> mvTask = new ArrayList<Task<MoveWork>>();
+    List<Task<MoveWork>> mvTask = new ArrayList<>();
 
     List<LoadTableDesc> loadTableWork = pCtx.getLoadTableWork();
     List<LoadFileDesc> loadFileWork = pCtx.getLoadFileWork();
@@ -214,7 +207,9 @@ public abstract class TaskCompiler {
       }
     } else if (!isCStats) {
       for (LoadTableDesc ltd : loadTableWork) {
-        Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, 
null, false), conf);
+        Task<MoveWork> tsk = TaskFactory
+            .get(new MoveWork(null, null, ltd, null, false, 
SessionState.get().getLineageState()),
+                conf);
         mvTask.add(tsk);
         // Check to see if we are stale'ing any indexes and auto-update them 
if we want
         if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) {
@@ -258,9 +253,7 @@ public abstract class TaskCompiler {
               }
               Warehouse wh = new Warehouse(conf);
               targetPath = wh.getDefaultTablePath(db.getDatabase(names[0]), 
names[1]);
-            } catch (HiveException e) {
-              throw new SemanticException(e);
-            } catch (MetaException e) {
+            } catch (HiveException | MetaException e) {
               throw new SemanticException(e);
             }
 
@@ -272,7 +265,9 @@ public abstract class TaskCompiler {
 
           oneLoadFile = false;
         }
-        mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), 
conf));
+        mvTask.add(TaskFactory
+            .get(new MoveWork(null, null, null, lfd, false, 
SessionState.get().getLineageState()),
+                conf));
       }
     }
 
@@ -296,7 +291,7 @@ public abstract class TaskCompiler {
      * a column stats task instead of a fetch task to persist stats to the 
metastore.
      */
     if (isCStats || !pCtx.getColumnStatsAutoGatherContexts().isEmpty()) {
-      Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<Task<? 
extends Serializable>>();
+      Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<>();
       getLeafTasks(rootTasks, leafTasks);
       if (isCStats) {
         genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, leafTasks, 
outerQueryLimit, 0);
@@ -384,7 +379,7 @@ public abstract class TaskCompiler {
     // find all leaf tasks and make the DDLTask as a dependent task of all of
     // them
     HashSet<Task<? extends Serializable>> leaves =
-        new LinkedHashSet<Task<? extends Serializable>>();
+        new LinkedHashSet<>();
     getLeafTasks(rootTasks, leaves);
     assert (leaves.size() > 0);
     for (Task<? extends Serializable> task : leaves) {
@@ -411,19 +406,14 @@ public abstract class TaskCompiler {
    * This method generates a plan with a column stats task on top of map-red 
task and sets up the
    * appropriate metadata to be used during execution.
    *
-   * @param analyzeRewrite
-   * @param loadTableWork
-   * @param loadFileWork
-   * @param rootTasks
-   * @param outerQueryLimit
    */
   @SuppressWarnings("unchecked")
   protected void genColumnStatsTask(AnalyzeRewriteContext analyzeRewrite,
       List<LoadFileDesc> loadFileWork, Set<Task<? extends Serializable>> 
leafTasks,
       int outerQueryLimit, int numBitVector) {
-    ColumnStatsTask cStatsTask = null;
-    ColumnStatsWork cStatsWork = null;
-    FetchWork fetch = null;
+    ColumnStatsTask cStatsTask;
+    ColumnStatsWork cStatsWork;
+    FetchWork fetch;
     String tableName = analyzeRewrite.getTableName();
     List<String> colName = analyzeRewrite.getColName();
     List<String> colType = analyzeRewrite.getColType();
@@ -450,7 +440,7 @@ public abstract class TaskCompiler {
 
     ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName,
         colName, colType, isTblLevel, numBitVector);
-    cStatsWork = new ColumnStatsWork(fetch, cStatsDesc);
+    cStatsWork = new ColumnStatsWork(fetch, cStatsDesc, 
SessionState.get().getCurrentDatabase());
     cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf);
     for (Task<? extends Serializable> tsk : leafTasks) {
       tsk.addDependentTask(cStatsTask);
@@ -461,7 +451,7 @@ public abstract class TaskCompiler {
   /**
    * Find all leaf tasks of the list of root tasks.
    */
-  protected void getLeafTasks(List<Task<? extends Serializable>> rootTasks,
+  private void getLeafTasks(List<Task<? extends Serializable>> rootTasks,
       Set<Task<? extends Serializable>> leaves) {
 
     for (Task<? extends Serializable> root : rootTasks) {

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
index 8db2889..5f9041e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
-import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
@@ -38,12 +37,14 @@ public class ColumnStatsUpdateWork implements Serializable {
   private ColumnStatsDesc colStats;
   private String partName;
   private Map<String, String> mapProp;
+  private String currentDatabaseName;
 
   public ColumnStatsUpdateWork(ColumnStatsDesc colStats, String partName,
-      Map<String, String> mapProp) {
+      Map<String, String> mapProp, String currentDatabaseName) {
     this.partName = partName;
     this.colStats = colStats;
     this.mapProp = mapProp;
+    this.currentDatabaseName = currentDatabaseName;
   }
 
   @Override
@@ -64,4 +65,11 @@ public class ColumnStatsUpdateWork implements Serializable {
     return mapProp;
   }
 
+  public String getCurrentDatabaseName() {
+    return currentDatabaseName;
+  }
+
+  public void setCurrentDatabaseName(String currentDatabaseName) {
+    this.currentDatabaseName = currentDatabaseName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java
index 76811b1..842fd1a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java
@@ -34,15 +34,17 @@ public class ColumnStatsWork implements Serializable {
   private static final long serialVersionUID = 1L;
   private FetchWork fWork;
   private ColumnStatsDesc colStats;
+  private String currentDatabaseName;
   private static final int LIMIT = -1;
 
 
   public ColumnStatsWork() {
   }
 
-  public ColumnStatsWork(FetchWork work, ColumnStatsDesc colStats) {
+  public ColumnStatsWork(FetchWork work, ColumnStatsDesc colStats, String 
currentDatabaseName) {
     this.fWork = work;
     this.setColStats(colStats);
+    this.currentDatabaseName = currentDatabaseName;
   }
 
   @Override
@@ -85,4 +87,11 @@ public class ColumnStatsWork implements Serializable {
     return LIMIT;
   }
 
+  public String getCurrentDatabaseName() {
+    return currentDatabaseName;
+  }
+
+  public void setCurrentDatabaseName(String currentDatabaseName) {
+    this.currentDatabaseName = currentDatabaseName;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
index 45d4fb0..023d247 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
@@ -35,7 +35,7 @@ public class LoadDesc implements Serializable {
    * Need to remember whether this is an acid compliant operation, and if so 
whether it is an
    * insert, update, or delete.
    */
-  private final AcidUtils.Operation writeType;
+  final AcidUtils.Operation writeType;
 
   public LoadDesc(final Path sourcePath, AcidUtils.Operation writeType) {
     this.sourcePath = sourcePath;

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index ab0a92a..90a970c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -18,14 +18,14 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
-import java.io.Serializable;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 /**
  * LoadTableDesc.
  *
@@ -37,6 +37,10 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
   private ListBucketingCtx lbCtx;
   private boolean inheritTableSpecs = true; //For partitions, flag controlling 
whether the current
                                             //table specs are to be used
+  /*
+  if the writeType above is NOT_ACID then the currentTransactionId will be null
+   */
+  private final Long currentTransactionId;
 
   // TODO: the below seems like they should just be combined into partitionDesc
   private org.apache.hadoop.hive.ql.plan.TableDesc table;
@@ -49,16 +53,18 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
     this.dpCtx = o.dpCtx;
     this.lbCtx = o.lbCtx;
     this.inheritTableSpecs = o.inheritTableSpecs;
+    this.currentTransactionId = o.currentTransactionId;
     this.table = o.table;
     this.partitionSpec = o.partitionSpec;
   }
 
   public LoadTableDesc(final Path sourcePath,
-      final org.apache.hadoop.hive.ql.plan.TableDesc table,
+      final TableDesc table,
       final Map<String, String> partitionSpec,
       final boolean replace,
-      final AcidUtils.Operation writeType) {
+      final AcidUtils.Operation writeType, Long currentTransactionId) {
     super(sourcePath, writeType);
+    this.currentTransactionId = currentTransactionId;
     init(table, partitionSpec, replace);
   }
 
@@ -70,17 +76,18 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
    * @param replace
    */
   public LoadTableDesc(final Path sourcePath,
-                       final TableDesc table,
-                       final Map<String, String> partitionSpec,
-                       final boolean replace) {
-    this(sourcePath, table, partitionSpec, replace, 
AcidUtils.Operation.NOT_ACID);
+      final TableDesc table,
+      final Map<String, String> partitionSpec,
+      final boolean replace) {
+    this(sourcePath, table, partitionSpec, replace, 
AcidUtils.Operation.NOT_ACID,
+        null);
   }
 
   public LoadTableDesc(final Path sourcePath,
-      final org.apache.hadoop.hive.ql.plan.TableDesc table,
+      final TableDesc table,
       final Map<String, String> partitionSpec,
-      final AcidUtils.Operation writeType) {
-    this(sourcePath, table, partitionSpec, true, writeType);
+      final AcidUtils.Operation writeType, Long currentTransactionId) {
+    this(sourcePath, table, partitionSpec, true, writeType, 
currentTransactionId);
   }
 
   /**
@@ -90,21 +97,22 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
    * @param partitionSpec
    */
   public LoadTableDesc(final Path sourcePath,
-                       final org.apache.hadoop.hive.ql.plan.TableDesc table,
-                       final Map<String, String> partitionSpec) {
-    this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID);
+      final TableDesc table,
+      final Map<String, String> partitionSpec) {
+    this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, 
null);
   }
 
   public LoadTableDesc(final Path sourcePath,
-      final org.apache.hadoop.hive.ql.plan.TableDesc table,
+      final TableDesc table,
       final DynamicPartitionCtx dpCtx,
-      final AcidUtils.Operation writeType) {
+      final AcidUtils.Operation writeType, Long currentTransactionId) {
     super(sourcePath, writeType);
     this.dpCtx = dpCtx;
+    this.currentTransactionId = currentTransactionId;
     if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) 
{
       init(table, dpCtx.getPartSpec(), true);
     } else {
-      init(table, new LinkedHashMap<String, String>(), true);
+      init(table, new LinkedHashMap<>(), true);
     }
   }
 
@@ -174,4 +182,7 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
     this.lbCtx = lbCtx;
   }
 
+  public long getCurrentTransactionId() {
+    return writeType == AcidUtils.Operation.NOT_ACID ? 0L : 
currentTransactionId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 8ce211f..00c0ce3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
-
+import org.apache.hadoop.hive.ql.session.LineageState;
 
 /**
  * MoveWork.
@@ -38,6 +38,12 @@ public class MoveWork implements Serializable {
   private LoadTableDesc loadTableWork;
   private LoadFileDesc loadFileWork;
   private LoadMultiFilesDesc loadMultiFilesWork;
+  /*
+  these are sessionState objects that are copied over to work to allow for 
parallel execution.
+  based on the current use case the methods are selectively synchronized, 
which might need to be
+  taken care when using other methods.
+   */
+  private final LineageState sessionStateLineageState;
 
   private boolean checkFileFormat;
   private boolean srcLocal;
@@ -57,17 +63,20 @@ public class MoveWork implements Serializable {
   protected List<Partition> movedParts;
 
   public MoveWork() {
+    sessionStateLineageState = null;
   }
 
-  public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) {
+  private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
+      LineageState lineageState) {
     this.inputs = inputs;
     this.outputs = outputs;
+    sessionStateLineageState = lineageState;
   }
 
   public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
       final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
-      boolean checkFileFormat, boolean srcLocal) {
-    this(inputs, outputs);
+      boolean checkFileFormat, boolean srcLocal, LineageState lineageState) {
+    this(inputs, outputs, lineageState);
     this.loadTableWork = loadTableWork;
     this.loadFileWork = loadFileWork;
     this.checkFileFormat = checkFileFormat;
@@ -76,8 +85,8 @@ public class MoveWork implements Serializable {
 
   public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
       final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
-      boolean checkFileFormat) {
-    this(inputs, outputs);
+      boolean checkFileFormat, LineageState lineageState) {
+    this(inputs, outputs, lineageState);
     this.loadTableWork = loadTableWork;
     this.loadFileWork = loadFileWork;
     this.checkFileFormat = checkFileFormat;
@@ -91,6 +100,7 @@ public class MoveWork implements Serializable {
     srcLocal = o.isSrcLocal();
     inputs = o.getInputs();
     outputs = o.getOutputs();
+    sessionStateLineageState = o.sessionStateLineageState;
   }
 
   @Explain(displayName = "tables", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
@@ -152,4 +162,7 @@ public class MoveWork implements Serializable {
     this.srcLocal = srcLocal;
   }
 
+  public LineageState getLineagState() {
+    return sessionStateLineageState;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
index e2f2a68..056d614 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java
@@ -21,11 +21,11 @@ package org.apache.hadoop.hive.ql.session;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.io.Serializable;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
@@ -36,7 +36,7 @@ import 
org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index;
  * lineage information for the post execution hooks.
  *
  */
-public class LineageState {
+public class LineageState implements Serializable {
 
   /**
    * Mapping from the directory name to FileSinkOperator (may not be 
FileSinkOperator for views). This
@@ -44,7 +44,7 @@ public class LineageState {
    * time and is then later used to created the mapping from
    * movetask to the set of filesink operators.
    */
-  private final Map<Path, Operator> dirToFop;
+  private final Map<String, Operator> dirToFop;
 
   /**
    * The lineage context index for this query.
@@ -60,8 +60,8 @@ public class LineageState {
   /**
    * Constructor.
    */
-  public LineageState() {
-    dirToFop = new HashMap<Path, Operator>();
+  LineageState() {
+    dirToFop = new HashMap<>();
     linfo = new LineageInfo();
     index = new Index();
   }
@@ -72,8 +72,8 @@ public class LineageState {
    * @param dir The directory name.
    * @param fop The sink operator.
    */
-  public void mapDirToOp(Path dir, Operator fop) {
-    dirToFop.put(dir, fop);
+  public synchronized void mapDirToOp(Path dir, Operator fop) {
+    dirToFop.put(dir.toUri().toString(), fop);
   }
 
   /**
@@ -83,10 +83,10 @@ public class LineageState {
    * @param newPath conditional input path
    * @param oldPath path of the old linked MoveWork
    */
-  public void updateDirToOpMap(Path newPath, Path oldPath) {
-    Operator op = dirToFop.get(oldPath);
+  public synchronized void updateDirToOpMap(Path newPath, Path oldPath) {
+    Operator op = dirToFop.get(oldPath.toUri().toString());
     if (op != null) {
-      dirToFop.put(newPath, op);
+      dirToFop.put(newPath.toUri().toString(), op);
     }
   }
 
@@ -97,10 +97,10 @@ public class LineageState {
    * @param dc The associated data container.
    * @param cols The list of columns.
    */
-  public void setLineage(Path dir, DataContainer dc,
+  public synchronized void setLineage(Path dir, DataContainer dc,
       List<FieldSchema> cols) {
     // First lookup the file sink operator from the load work.
-    Operator<?> op = dirToFop.get(dir);
+    Operator<?> op = dirToFop.get(dir.toUri().toString());
 
     // Go over the associated fields and look up the dependencies
     // by position in the row schema of the filesink operator.
@@ -136,7 +136,7 @@ public class LineageState {
   /**
    * Clear all lineage states
    */
-  public void clear() {
+  public synchronized void clear() {
     dirToFop.clear();
     linfo.clear();
     index.clear();

Reply via email to