HIVE-17426 : Execution framework in hive to run tasks in parallel (Anishek Agarwal, reviwed by Daniel Dai, Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/39d8d73e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/39d8d73e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/39d8d73e Branch: refs/heads/master Commit: 39d8d73e7b7584a48f46ad6b455da6b87e80ea51 Parents: 37be57b Author: Thejas M Nair <[email protected]> Authored: Fri Sep 22 20:16:49 2017 -0700 Committer: Thejas M Nair <[email protected]> Committed: Fri Sep 22 20:16:58 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 10 ++-- ...TestReplicationScenariosAcrossInstances.java | 29 +++++++++ .../java/org/apache/hadoop/hive/ql/Driver.java | 15 ++--- .../apache/hadoop/hive/ql/DriverContext.java | 2 +- .../hadoop/hive/ql/exec/ColumnStatsTask.java | 3 +- .../hive/ql/exec/ColumnStatsUpdateTask.java | 3 +- .../hadoop/hive/ql/exec/ConditionalTask.java | 7 ++- .../apache/hadoop/hive/ql/exec/CopyTask.java | 1 - .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 9 +++ .../hive/ql/exec/ExplainSQRewriteTask.java | 5 ++ .../apache/hadoop/hive/ql/exec/ExplainTask.java | 5 ++ .../apache/hadoop/hive/ql/exec/FetchTask.java | 4 ++ .../hadoop/hive/ql/exec/FunctionTask.java | 8 +++ .../apache/hadoop/hive/ql/exec/MoveTask.java | 32 +++++----- .../apache/hadoop/hive/ql/exec/NodeUtils.java | 7 ++- .../org/apache/hadoop/hive/ql/exec/Task.java | 61 +++++++++++-------- .../hadoop/hive/ql/exec/mr/MapRedTask.java | 2 - .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 5 ++ .../ql/exec/repl/bootstrap/ReplLoadTask.java | 8 ++- .../ql/exec/repl/bootstrap/ReplLoadWork.java | 20 +++++-- .../bootstrap/load/table/LoadPartitions.java | 6 +- .../repl/bootstrap/load/table/LoadTable.java | 6 +- .../exec/repl/bootstrap/load/util/Context.java | 15 ++++- .../hadoop/hive/ql/hooks/LineageInfo.java | 2 +- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 3 +- .../hadoop/hive/ql/io/merge/MergeFileTask.java | 1 - .../hive/ql/optimizer/GenMapRedUtils.java | 6 +- .../hive/ql/optimizer/lineage/LineageCtx.java | 3 +- .../hive/ql/parse/DDLSemanticAnalyzer.java | 10 ++-- .../hive/ql/parse/ImportSemanticAnalyzer.java | 34 ++++++----- .../hive/ql/parse/LoadSemanticAnalyzer.java | 10 ++-- .../ql/parse/ReplicationSemanticAnalyzer.java | 7 ++- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 10 +++- .../hadoop/hive/ql/parse/TaskCompiler.java | 62 ++++++++------------ .../hive/ql/plan/ColumnStatsUpdateWork.java | 12 +++- .../hadoop/hive/ql/plan/ColumnStatsWork.java | 11 +++- .../apache/hadoop/hive/ql/plan/LoadDesc.java | 2 +- .../hadoop/hive/ql/plan/LoadTableDesc.java | 49 ++++++++++------ .../apache/hadoop/hive/ql/plan/MoveWork.java | 25 ++++++-- .../hadoop/hive/ql/session/LineageState.java | 26 ++++---- ...TestGenMapRedUtilsCreateConditionalTask.java | 62 ++++++++++++-------- 41 files changed, 381 insertions(+), 217 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1ba5968..00d2e46 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -442,11 +442,11 @@ public class HiveConf extends Configuration { "Inteval for cmroot cleanup thread."), REPL_FUNCTIONS_ROOT_DIR("hive.repl.replica.functions.root.dir","/user/hive/repl/functions/", "Root directory on the replica warehouse where the repl sub-system will store jars from the primary warehouse"), - REPL_APPROX_MAX_LOAD_TASKS("hive.repl.approx.max.load.tasks", 1000, - "Provide and approximate of the max number of tasks that should be executed in before \n" + - "dynamically generating the next set of tasks. The number is an approximate as we \n" + - "will stop at slightly higher number than above, the reason being some events might \n" + - "lead to an task increment that would cross the above limit"), + REPL_APPROX_MAX_LOAD_TASKS("hive.repl.approx.max.load.tasks", 10000, + "Provide an approximation of the maximum number of tasks that should be executed before \n" + + "dynamically generating the next set of tasks. The number is approximate as Hive \n" + + "will stop at a slightly higher number, the reason being some events might lead to a \n" + + "task increment that would cross the specified limit."), REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",5, "Number of threads that will be used to dump partition data information during repl dump."), REPL_DUMPDIR_CLEAN_FREQ("hive.repl.dumpdir.clean.freq", "0s", http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 3dee0d2..50c4a98 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -248,4 +248,33 @@ public class TestReplicationScenariosAcrossInstances { .run("select country from t2 order by country") .verifyResults(new String[] { "france", "india", "us" }); } + + @Test + public void parallelExecutionOfReplicationBootStrapLoad() throws Throwable { + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create table t1 (id int)") + .run("create table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='australia') values ('sydney')") + .run("insert into table t2 partition(country='russia') values ('moscow')") + .run("insert into table t2 partition(country='uk') values ('london')") + .run("insert into table t2 partition(country='us') values ('sfo')") + .run("insert into table t2 partition(country='france') values ('paris')") + .run("insert into table t2 partition(country='japan') values ('tokyo')") + .run("insert into table t2 partition(country='china') values ('hkg')") + .run("create table t3 (rank int)") + .dump(primaryDbName, null); + + replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, true); + replica.load(replicatedDbName, tuple.dumpLocation) + .run("use " + replicatedDbName) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("show tables") + .verifyResults(new String[] { "t1", "t2", "t3" }) + .run("select country from t2") + .verifyResults(Arrays.asList("india", "australia", "russia", "uk", "us", "france", "japan", + "china")); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 6616cba..e802d42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -39,7 +39,6 @@ import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; - import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.ValidReadTxnList; @@ -90,6 +89,7 @@ import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo; +import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext; import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl; @@ -100,7 +100,6 @@ import org.apache.hadoop.hive.ql.parse.ParseUtils; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory; -import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -108,11 +107,11 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessor; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.security.authorization.AuthorizationUtils; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivObjectActionType; import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType; -import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ByteStream; @@ -121,9 +120,7 @@ import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; - import org.apache.hive.common.util.ShutdownHookManager; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1809,9 +1806,9 @@ public class Driver implements CommandProcessor { ctx.setHDFSCleanup(true); this.driverCxt = driverCxt; // for canceling the query (should be bound to session?) - SessionState.get().setMapRedStats(new LinkedHashMap<String, MapRedStats>()); - SessionState.get().setStackTraces(new HashMap<String, List<List<String>>>()); - SessionState.get().setLocalMapRedErrors(new HashMap<String, List<String>>()); + SessionState.get().setMapRedStats(new LinkedHashMap<>()); + SessionState.get().setStackTraces(new HashMap<>()); + SessionState.get().setLocalMapRedErrors(new HashMap<>()); // Add root Tasks to runnable for (Task<? extends Serializable> tsk : plan.getRootTasks()) { @@ -2180,7 +2177,7 @@ public class Driver implements CommandProcessor { cxt.launching(tskRun); // Launch Task - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) { + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.canExecuteInParallel()) { // Launch it in the parallel mode, as a separate thread only for MR tasks if (LOG.isInfoEnabled()){ LOG.info("Starting task [" + tsk + "] in parallel"); http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java index f43992c..583d3d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java @@ -160,7 +160,7 @@ public class DriverContext { public static boolean isLaunchable(Task<? extends Serializable> tsk) { // A launchable task is one that hasn't been queued, hasn't been // initialized, and is runnable. - return !tsk.getQueued() && !tsk.getInitialized() && tsk.isRunnable(); + return tsk.isNotInitialized() && tsk.isRunnable(); } public synchronized boolean addToRunnable(Task<? extends Serializable> tsk) throws HiveException { http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java index 2b2c004..e3b0d1e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState; import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -330,7 +329,7 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab private List<ColumnStatistics> constructColumnStatsFromPackedRows( Hive db) throws HiveException, MetaException, IOException { - String currentDb = SessionState.get().getCurrentDatabase(); + String currentDb = work.getCurrentDatabaseName(); String tableName = work.getColStats().getTableName(); String partName = null; List<String> colName = work.getColStats().getColName(); http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java index 82fbf28..48a9c9a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java @@ -51,7 +51,6 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +77,7 @@ public class ColumnStatsUpdateTask extends Task<ColumnStatsUpdateWork> { private ColumnStatistics constructColumnStatsFromInput() throws SemanticException, MetaException { - String dbName = SessionState.get().getCurrentDatabase(); + String dbName = work.getCurrentDatabaseName(); ColumnStatsDesc desc = work.getColStats(); String tableName = desc.getTableName(); String partName = work.getPartName(); http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java index 52cb445..a51e69d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java @@ -21,9 +21,7 @@ package org.apache.hadoop.hive.ql.exec; import java.io.Serializable; import java.util.List; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; -import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ConditionalResolver; import org.apache.hadoop.hive.ql.plan.ConditionalWork; @@ -61,6 +59,11 @@ public class ConditionalTask extends Task<ConditionalWork> implements Serializab } @Override + public boolean canExecuteInParallel() { + return isMapRedTask(); + } + + @Override public boolean hasReduce() { for (Task<? extends Serializable> task : listTasks) { if (task.hasReduce()) { http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java index 2683f29..2e06ad0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.plan.CopyWork; http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index ad5b3a3..443be54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4941,4 +4941,13 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } return retval; } + + /* + uses the authorizer from SessionState will need some more work to get this to run in parallel, + however this should not be a bottle neck so might not need to parallelize this. + */ + @Override + public boolean canExecuteInParallel() { + return false; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java index 6fffab0..ec5ebb0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainSQRewriteTask.java @@ -202,4 +202,9 @@ public class ExplainSQRewriteTask extends Task<ExplainSQRewriteWork> implements colList.add(tmpFieldSchema); return colList; } + + @Override + public boolean canExecuteInParallel() { + return false; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java index c25a783..da76c3c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java @@ -1230,4 +1230,9 @@ public class ExplainTask extends Task<ExplainWork> implements Serializable { colList.add(tmpFieldSchema); return colList; } + + @Override + public boolean canExecuteInParallel() { + return false; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index e708d58..bde052b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -193,4 +193,8 @@ public class FetchTask extends Task<FetchWork> implements Serializable { } } + @Override + public boolean canExecuteInParallel() { + return false; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java index 0f990e6..bb0fff4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java @@ -357,4 +357,12 @@ public class FunctionTask extends Task<FunctionWork> { public String getName() { return "FUNCTION"; } + + /** + * this needs access to session state resource downloads which in turn uses references to Registry objects. + */ + @Override + public boolean canExecuteInParallel() { + return false; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index a1e4f96..c8ad795 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -18,22 +18,12 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; - import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.io.HdfsUtils; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; @@ -68,11 +58,19 @@ import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; + /** * MoveTask implementation. **/ @@ -458,7 +456,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { dpCtx.getNumDPCols(), isSkewedStoredAsDirs(tbd), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, - SessionState.get().getTxnMgr().getCurrentTxnId(), hasFollowingStatsTask(), + work.getLoadTableWork().getCurrentTransactionId(), hasFollowingStatsTask(), work.getLoadTableWork().getWriteType()); // publish DP columns to its subscribers @@ -506,10 +504,10 @@ public class MoveTask extends Task<MoveWork> implements Serializable { dc = new DataContainer(table.getTTable(), partn.getTPartition()); // Don't set lineage on delete as we don't have all the columns - if (SessionState.get() != null && + if (work.getLineagState() != null && work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) { - SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, + work.getLineagState().setLineage(tbd.getSourcePath(), dc, table.getCols()); } LOG.info("\tLoading partition " + entry.getKey()); @@ -540,7 +538,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { } } } - if (SessionState.get() != null && dc != null) { + if (work.getLineagState() != null && dc != null) { // If we are doing an update or a delete the number of columns in the table will not // match the number of columns in the file sink. For update there will be one too many // (because of the ROW__ID), and in the case of the delete there will be just the @@ -551,14 +549,14 @@ public class MoveTask extends Task<MoveWork> implements Serializable { case UPDATE: // Pass an empty list as no columns will be written to the file. // TODO I should be able to make this work for update - tableCols = new ArrayList<FieldSchema>(); + tableCols = new ArrayList<>(); break; default: tableCols = table.getCols(); break; } - SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, tableCols); + work.getLineagState().setLineage(tbd.getSourcePath(), dc, tableCols); } releaseLocks(tbd); } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java index 249ffb3..a9f9fe2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/NodeUtils.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec; import org.apache.hadoop.hive.ql.lib.Node; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -33,7 +34,7 @@ import java.util.Set; public class NodeUtils { - public static <T> void iterateTask(Collection<Task<?>> tasks, Class<T> clazz, Function<T> function) { + public static <T> void iterateTask(Collection<Task<? extends Serializable>> tasks, Class<T> clazz, Function<T> function) { // Does a breadth first traversal of the tasks Set<Task> visited = new HashSet<Task>(); while (!tasks.isEmpty()) { @@ -42,7 +43,7 @@ public class NodeUtils { return; } - private static <T> Collection<Task<?>> iterateTask(Collection<Task<?>> tasks, + private static <T> Collection<Task<? extends Serializable>> iterateTask(Collection<Task<?>> tasks, Class<T> clazz, Function<T> function, Set<Task> visited) { @@ -92,7 +93,7 @@ public class NodeUtils { return childListNodes; } - public static interface Function<T> { + public interface Function<T> { void apply(T argument); } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index e1bd291..6193b90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -18,17 +18,7 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; - import org.apache.hadoop.hive.common.metrics.common.Metrics; -import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; @@ -47,6 +37,15 @@ import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; + /** * Task implementation. **/ @@ -113,7 +112,17 @@ public abstract class Task<T extends Serializable> implements Serializable, Node public enum FeedType { DYNAMIC_PARTITIONS, // list of dynamic partitions } + + /** + * Order of the States here is important as the ordinal values are used + * determine the progression of taskState over its lifeCycle which is then + * used to make some decisions in Driver.execute + */ public enum TaskState { + // Task state is unkown + UNKNOWN, + // Task is just created + CREATED, // Task data structures have been initialized INITIALIZED, // Task has been queued for execution by the driver @@ -121,11 +130,7 @@ public abstract class Task<T extends Serializable> implements Serializable, Node // Task is currently running RUNNING, // Task has completed - FINISHED, - // Task is just created - CREATED, - // Task state is unkown - UNKNOWN + FINISHED } // Bean methods @@ -366,38 +371,44 @@ public abstract class Task<T extends Serializable> implements Serializable, Node } } } - public void setStarted() { + + public synchronized void setStarted() { setState(TaskState.RUNNING); } - public boolean started() { + public synchronized boolean started() { return taskState == TaskState.RUNNING; } - public boolean done() { + public synchronized boolean done() { return taskState == TaskState.FINISHED; } - public void setDone() { + public synchronized void setDone() { setState(TaskState.FINISHED); } - public void setQueued() { + public synchronized void setQueued() { setState(TaskState.QUEUED); } - public boolean getQueued() { + public synchronized boolean getQueued() { return taskState == TaskState.QUEUED; } - public void setInitialized() { + public synchronized void setInitialized() { setState(TaskState.INITIALIZED); } - public boolean getInitialized() { + public synchronized boolean getInitialized() { return taskState == TaskState.INITIALIZED; } + public synchronized boolean isNotInitialized() { + return taskState.ordinal() < TaskState.INITIALIZED.ordinal(); + } + + public boolean isRunnable() { boolean isrunnable = true; if (parentTasks != null) { @@ -630,5 +641,7 @@ public abstract class Task<T extends Serializable> implements Serializable, Node return toString().equals(String.valueOf(obj)); } - + public boolean canExecuteInParallel(){ + return true; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java index 1bd4db7..41a1ef1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hive.common.io.CachingPrintStream; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Operator; @@ -49,7 +48,6 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.ResourceType; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.StreamPrinter; /** http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 3cae543..f9991d9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -356,4 +356,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { public StageType getType() { return StageType.REPL_DUMP; } + + @Override + public boolean canExecuteInParallel() { + return false; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index ca2e992..4d8d06a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -70,7 +70,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { protected int execute(DriverContext driverContext) { try { int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS); - Context context = new Context(conf, getHive()); + Context context = new Context(conf, getHive(), work.sessionStateLineageState, + work.currentTransactionId); TaskTracker loadTaskTracker = new TaskTracker(maxTasks); /* for now for simplicity we are doing just one directory ( one database ), come back to use @@ -206,6 +207,11 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { work.updateDbEventState(null); } this.childTasks = scope.rootTasks; + /* + Since there can be multiple rounds of this run all of which will be tied to the same + query id -- generated in compile phase , adding a additional UUID to the end to print each run + in separate files. + */ LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks()); // Populate the driver context with the scratch dir info from the repl context, so that the temp dirs will be cleaned up later http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java index a8e9067..c4ca121 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator; import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.session.LineageState; import java.io.IOException; import java.io.Serializable; @@ -37,17 +38,28 @@ public class ReplLoadWork implements Serializable { private int loadTaskRunCount = 0; private DatabaseEvent.State state = null; + /* + these are sessionState objects that are copied over to work to allow for parallel execution. + based on the current use case the methods are selectively synchronized, which might need to be + taken care when using other methods. + */ + final LineageState sessionStateLineageState; + public final long currentTransactionId; + public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, - String tableNameToLoadIn) throws IOException { + String tableNameToLoadIn, LineageState lineageState, long currentTransactionId) + throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; + sessionStateLineageState = lineageState; + this.currentTransactionId = currentTransactionId; this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf); this.dbNameToLoadIn = dbNameToLoadIn; } - public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern) - throws IOException { - this(hiveConf, dumpDirectory, dbNameOrPattern, null); + public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern, + LineageState lineageState, long currentTransactionId) throws IOException { + this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, currentTransactionId); } public BootstrapEventsIterator iterator() { http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index cef07ad..5c6ef9f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -238,10 +238,10 @@ public class LoadPartitions { Path tmpPath) { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), - event.replicationSpec().isReplace() - ); + event.replicationSpec().isReplace()); loadTableWork.setInheritTableSpecs(false); - MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); + MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, + context.sessionStateLineageState); return TaskFactory.get(work, context.hiveConf); } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 2bf3784..a9a9162 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -224,10 +224,10 @@ public class LoadTable { ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); LoadTableDesc loadTableWork = new LoadTableDesc( - tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace() - ); + tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace()); MoveWork moveWork = - new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); + new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, + context.sessionStateLineageState); Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf); copyTask.addDependentTask(loadTableTask); return copyTask; http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java index 8948b0c..5e3e31e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.session.LineageState; public class Context { public final HiveConf hiveConf; @@ -28,10 +29,22 @@ public class Context { public final Warehouse warehouse; public final PathInfo pathInfo; - public Context(HiveConf hiveConf, Hive hiveDb) throws MetaException { + /* + these are sessionState objects that are copied over to work to allow for parallel execution. + based on the current use case the methods are selectively synchronized, which might need to be + taken care when using other methods. + */ + public final LineageState sessionStateLineageState; + public final long currentTransactionId; + + + public Context(HiveConf hiveConf, Hive hiveDb, + LineageState lineageState, long currentTransactionId) throws MetaException { this.hiveConf = hiveConf; this.hiveDb = hiveDb; this.warehouse = new Warehouse(hiveConf); this.pathInfo = new PathInfo(hiveConf); + sessionStateLineageState = lineageState; + this.currentTransactionId = currentTransactionId; } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java index 7305436..05b7d71 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/LineageInfo.java @@ -430,7 +430,7 @@ public class LineageInfo implements Serializable { /** * This class tracks the predicate information for an operator. */ - public static class Predicate { + public static class Predicate implements Serializable { /** * Expression string for the predicate. http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index c64bc8c..89fbc14 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -317,7 +318,7 @@ public class AcidUtils { return result; } - public enum Operation { + public enum Operation implements Serializable { NOT_ACID, INSERT, UPDATE, DELETE; } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index da99c23..0e86aac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.FileInputFormat; http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index da153e3..25cca7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat; +import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -1222,7 +1223,6 @@ public final class GenMapRedUtils { /** * @param fsInput The FileSink operator. - * @param ctx The MR processing context. * @param finalName the final destination path the merge job should output. * @param dependencyTask * @param mvTasks @@ -1309,8 +1309,10 @@ public final class GenMapRedUtils { // // 2. Constructing a conditional task consisting of a move task and a map reduce task // + HiveTxnManager txnMgr = SessionState.get().getTxnMgr(); MoveWork dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false); + new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, + null), false, SessionState.get().getLineageState()); MapWork cplan; Serializable work; http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java index 3c20532..5557138 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/LineageCtx.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.optimizer.lineage; +import java.io.Serializable; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -47,7 +48,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; */ public class LineageCtx implements NodeProcessorCtx { - public static class Index { + public static class Index implements Serializable { /** * The map contains an index from the (operator, columnInfo) to the http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 251deca..2c55011 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -592,7 +592,7 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tbl.getDbName() + "." + tbl.getTableName(), Arrays.asList(colName), Arrays.asList(colType), partSpec == null); ColumnStatsUpdateTask cStatsUpdateTask = (ColumnStatsUpdateTask) TaskFactory - .get(new ColumnStatsUpdateWork(cStatsDesc, partName, mapProp), conf); + .get(new ColumnStatsUpdateWork(cStatsDesc, partName, mapProp, SessionState.get().getCurrentDatabase()), conf); rootTasks.add(cStatsUpdateTask); } @@ -1083,9 +1083,10 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc); truncateTblDesc.setOutputDir(queryTmpdir); LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, - partSpec == null ? new HashMap<String, String>() : partSpec); + partSpec == null ? new HashMap<>() : partSpec); ltd.setLbCtx(lbCtx); - Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), + Task<MoveWork> moveTsk = TaskFactory + .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), conf); truncateTask.addDependentTask(moveTsk); @@ -1719,7 +1720,8 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap<>() : partSpec); ltd.setLbCtx(lbCtx); - Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), + Task<MoveWork> moveTsk = TaskFactory + .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), conf); mergeTask.addDependentTask(moveTsk); http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 89c4e2c..751bda0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -18,19 +18,6 @@ package org.apache.hadoop.hive.ql.parse; -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - import org.antlr.runtime.tree.Tree; import org.apache.commons.lang.ObjectUtils; import org.apache.hadoop.fs.FileStatus; @@ -70,6 +57,19 @@ import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.mapred.OutputFormat; import org.slf4j.Logger; +import java.io.IOException; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + /** * ImportSemanticAnalyzer. * @@ -349,10 +349,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath); Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf()); LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, - Utilities.getTableDesc(table), new TreeMap<String, String>(), + Utilities.getTableDesc(table), new TreeMap<>(), replace); Task<?> loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(), - x.getOutputs(), loadTableWork, null, false), x.getConf()); + x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()), + x.getConf()); copyTask.addDependentTask(loadTableTask); x.getTasks().add(copyTask); return loadTableTask; @@ -422,7 +423,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { partSpec.getPartSpec(), replicationSpec.isReplace()); loadTableWork.setInheritTableSpecs(false); Task<?> loadPartTask = TaskFactory.get(new MoveWork( - x.getInputs(), x.getOutputs(), loadTableWork, null, false), + x.getInputs(), x.getOutputs(), loadTableWork, null, false, + SessionState.get().getLineageState()), x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index fa79700..8879b80 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.parse; import org.apache.hadoop.hive.conf.HiveConf.StrictChecks; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; - import java.io.IOException; import java.io.Serializable; import java.net.URI; @@ -54,10 +52,10 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.StatsWork; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.InputFormat; import com.google.common.collect.Lists; -import org.apache.orc.impl.OrcAcidUtils; /** * LoadSemanticAnalyzer. @@ -285,8 +283,10 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer { loadTableWork.setInheritTableSpecs(false); } - Task<? extends Serializable> childTask = TaskFactory.get(new MoveWork(getInputs(), - getOutputs(), loadTableWork, null, true, isLocal), conf); + Task<? extends Serializable> childTask = TaskFactory.get( + new MoveWork(getInputs(), getOutputs(), loadTableWork, null, true, + isLocal, SessionState.get().getLineageState()), conf + ); if (rTask != null) { rTask.addDependentTask(childTask); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index fd97baa..e7e7f9b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.session.SessionState; import java.io.FileNotFoundException; import java.io.Serializable; @@ -286,7 +287,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { ReplLoadWork replLoadWork = - new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern); + new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern, + SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); return; } @@ -316,7 +318,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { + " does not correspond to REPL LOAD expecting to load to a singular destination point."); } - ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern); + ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, + SessionState.get().getLineageState(), SessionState.get().getTxnMgr().getCurrentTxnId()); rootTasks.add(TaskFactory.get(replLoadWork, conf)); // // for (FileStatus dir : dirsInLoadPath) { http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index d56fd21..4814fcd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -6902,7 +6902,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab); } - ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp); + Long currentTransactionId = acidOp == Operation.NOT_ACID ? null : + SessionState.get().getTxnMgr().getCurrentTxnId(); + ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, + currentTransactionId); // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), @@ -7017,7 +7020,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab); } - ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp); + Long currentTransactionId = (acidOp == Operation.NOT_ACID) ? null : + SessionState.get().getTxnMgr().getCurrentTxnId(); + ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, + currentTransactionId); // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 08a8f00..f0089fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -18,20 +18,8 @@ package org.apache.hadoop.hive.ql.parse; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.Stack; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -42,7 +30,6 @@ import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.ColumnStatsTask; import org.apache.hadoop.hive.ql.exec.FetchTask; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -54,7 +41,6 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; -import org.apache.hadoop.hive.ql.optimizer.physical.AnnotateRunTimeStatsOptimizer; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.AnalyzeRewriteContext; import org.apache.hadoop.hive.ql.plan.ColumnStatsDesc; import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; @@ -76,9 +62,16 @@ import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.thrift.ThriftFormatter; import org.apache.hadoop.hive.serde2.thrift.ThriftJDBCBinarySerDe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; /** * TaskCompiler is a the base class for classes that compile @@ -107,7 +100,7 @@ public abstract class TaskCompiler { Context ctx = pCtx.getContext(); GlobalLimitCtx globalLimitCtx = pCtx.getGlobalLimitCtx(); - List<Task<MoveWork>> mvTask = new ArrayList<Task<MoveWork>>(); + List<Task<MoveWork>> mvTask = new ArrayList<>(); List<LoadTableDesc> loadTableWork = pCtx.getLoadTableWork(); List<LoadFileDesc> loadFileWork = pCtx.getLoadFileWork(); @@ -214,7 +207,9 @@ public abstract class TaskCompiler { } } else if (!isCStats) { for (LoadTableDesc ltd : loadTableWork) { - Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); + Task<MoveWork> tsk = TaskFactory + .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), + conf); mvTask.add(tsk); // Check to see if we are stale'ing any indexes and auto-update them if we want if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEINDEXAUTOUPDATE)) { @@ -258,9 +253,7 @@ public abstract class TaskCompiler { } Warehouse wh = new Warehouse(conf); targetPath = wh.getDefaultTablePath(db.getDatabase(names[0]), names[1]); - } catch (HiveException e) { - throw new SemanticException(e); - } catch (MetaException e) { + } catch (HiveException | MetaException e) { throw new SemanticException(e); } @@ -272,7 +265,9 @@ public abstract class TaskCompiler { oneLoadFile = false; } - mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), conf)); + mvTask.add(TaskFactory + .get(new MoveWork(null, null, null, lfd, false, SessionState.get().getLineageState()), + conf)); } } @@ -296,7 +291,7 @@ public abstract class TaskCompiler { * a column stats task instead of a fetch task to persist stats to the metastore. */ if (isCStats || !pCtx.getColumnStatsAutoGatherContexts().isEmpty()) { - Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<Task<? extends Serializable>>(); + Set<Task<? extends Serializable>> leafTasks = new LinkedHashSet<>(); getLeafTasks(rootTasks, leafTasks); if (isCStats) { genColumnStatsTask(pCtx.getAnalyzeRewrite(), loadFileWork, leafTasks, outerQueryLimit, 0); @@ -384,7 +379,7 @@ public abstract class TaskCompiler { // find all leaf tasks and make the DDLTask as a dependent task of all of // them HashSet<Task<? extends Serializable>> leaves = - new LinkedHashSet<Task<? extends Serializable>>(); + new LinkedHashSet<>(); getLeafTasks(rootTasks, leaves); assert (leaves.size() > 0); for (Task<? extends Serializable> task : leaves) { @@ -411,19 +406,14 @@ public abstract class TaskCompiler { * This method generates a plan with a column stats task on top of map-red task and sets up the * appropriate metadata to be used during execution. * - * @param analyzeRewrite - * @param loadTableWork - * @param loadFileWork - * @param rootTasks - * @param outerQueryLimit */ @SuppressWarnings("unchecked") protected void genColumnStatsTask(AnalyzeRewriteContext analyzeRewrite, List<LoadFileDesc> loadFileWork, Set<Task<? extends Serializable>> leafTasks, int outerQueryLimit, int numBitVector) { - ColumnStatsTask cStatsTask = null; - ColumnStatsWork cStatsWork = null; - FetchWork fetch = null; + ColumnStatsTask cStatsTask; + ColumnStatsWork cStatsWork; + FetchWork fetch; String tableName = analyzeRewrite.getTableName(); List<String> colName = analyzeRewrite.getColName(); List<String> colType = analyzeRewrite.getColType(); @@ -450,7 +440,7 @@ public abstract class TaskCompiler { ColumnStatsDesc cStatsDesc = new ColumnStatsDesc(tableName, colName, colType, isTblLevel, numBitVector); - cStatsWork = new ColumnStatsWork(fetch, cStatsDesc); + cStatsWork = new ColumnStatsWork(fetch, cStatsDesc, SessionState.get().getCurrentDatabase()); cStatsTask = (ColumnStatsTask) TaskFactory.get(cStatsWork, conf); for (Task<? extends Serializable> tsk : leafTasks) { tsk.addDependentTask(cStatsTask); @@ -461,7 +451,7 @@ public abstract class TaskCompiler { /** * Find all leaf tasks of the list of root tasks. */ - protected void getLeafTasks(List<Task<? extends Serializable>> rootTasks, + private void getLeafTasks(List<Task<? extends Serializable>> rootTasks, Set<Task<? extends Serializable>> leaves) { for (Task<? extends Serializable> root : rootTasks) { http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java index 8db2889..5f9041e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; -import java.util.List; import java.util.Map; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -38,12 +37,14 @@ public class ColumnStatsUpdateWork implements Serializable { private ColumnStatsDesc colStats; private String partName; private Map<String, String> mapProp; + private String currentDatabaseName; public ColumnStatsUpdateWork(ColumnStatsDesc colStats, String partName, - Map<String, String> mapProp) { + Map<String, String> mapProp, String currentDatabaseName) { this.partName = partName; this.colStats = colStats; this.mapProp = mapProp; + this.currentDatabaseName = currentDatabaseName; } @Override @@ -64,4 +65,11 @@ public class ColumnStatsUpdateWork implements Serializable { return mapProp; } + public String getCurrentDatabaseName() { + return currentDatabaseName; + } + + public void setCurrentDatabaseName(String currentDatabaseName) { + this.currentDatabaseName = currentDatabaseName; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java index 76811b1..842fd1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java @@ -34,15 +34,17 @@ public class ColumnStatsWork implements Serializable { private static final long serialVersionUID = 1L; private FetchWork fWork; private ColumnStatsDesc colStats; + private String currentDatabaseName; private static final int LIMIT = -1; public ColumnStatsWork() { } - public ColumnStatsWork(FetchWork work, ColumnStatsDesc colStats) { + public ColumnStatsWork(FetchWork work, ColumnStatsDesc colStats, String currentDatabaseName) { this.fWork = work; this.setColStats(colStats); + this.currentDatabaseName = currentDatabaseName; } @Override @@ -85,4 +87,11 @@ public class ColumnStatsWork implements Serializable { return LIMIT; } + public String getCurrentDatabaseName() { + return currentDatabaseName; + } + + public void setCurrentDatabaseName(String currentDatabaseName) { + this.currentDatabaseName = currentDatabaseName; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java index 45d4fb0..023d247 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java @@ -35,7 +35,7 @@ public class LoadDesc implements Serializable { * Need to remember whether this is an acid compliant operation, and if so whether it is an * insert, update, or delete. */ - private final AcidUtils.Operation writeType; + final AcidUtils.Operation writeType; public LoadDesc(final Path sourcePath, AcidUtils.Operation writeType) { this.sourcePath = sourcePath; http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index ab0a92a..90a970c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -18,14 +18,14 @@ package org.apache.hadoop.hive.ql.plan; -import java.io.Serializable; -import java.util.LinkedHashMap; -import java.util.Map; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.plan.Explain.Level; +import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.Map; + /** * LoadTableDesc. * @@ -37,6 +37,10 @@ public class LoadTableDesc extends LoadDesc implements Serializable { private ListBucketingCtx lbCtx; private boolean inheritTableSpecs = true; //For partitions, flag controlling whether the current //table specs are to be used + /* + if the writeType above is NOT_ACID then the currentTransactionId will be null + */ + private final Long currentTransactionId; // TODO: the below seems like they should just be combined into partitionDesc private org.apache.hadoop.hive.ql.plan.TableDesc table; @@ -49,16 +53,18 @@ public class LoadTableDesc extends LoadDesc implements Serializable { this.dpCtx = o.dpCtx; this.lbCtx = o.lbCtx; this.inheritTableSpecs = o.inheritTableSpecs; + this.currentTransactionId = o.currentTransactionId; this.table = o.table; this.partitionSpec = o.partitionSpec; } public LoadTableDesc(final Path sourcePath, - final org.apache.hadoop.hive.ql.plan.TableDesc table, + final TableDesc table, final Map<String, String> partitionSpec, final boolean replace, - final AcidUtils.Operation writeType) { + final AcidUtils.Operation writeType, Long currentTransactionId) { super(sourcePath, writeType); + this.currentTransactionId = currentTransactionId; init(table, partitionSpec, replace); } @@ -70,17 +76,18 @@ public class LoadTableDesc extends LoadDesc implements Serializable { * @param replace */ public LoadTableDesc(final Path sourcePath, - final TableDesc table, - final Map<String, String> partitionSpec, - final boolean replace) { - this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID); + final TableDesc table, + final Map<String, String> partitionSpec, + final boolean replace) { + this(sourcePath, table, partitionSpec, replace, AcidUtils.Operation.NOT_ACID, + null); } public LoadTableDesc(final Path sourcePath, - final org.apache.hadoop.hive.ql.plan.TableDesc table, + final TableDesc table, final Map<String, String> partitionSpec, - final AcidUtils.Operation writeType) { - this(sourcePath, table, partitionSpec, true, writeType); + final AcidUtils.Operation writeType, Long currentTransactionId) { + this(sourcePath, table, partitionSpec, true, writeType, currentTransactionId); } /** @@ -90,21 +97,22 @@ public class LoadTableDesc extends LoadDesc implements Serializable { * @param partitionSpec */ public LoadTableDesc(final Path sourcePath, - final org.apache.hadoop.hive.ql.plan.TableDesc table, - final Map<String, String> partitionSpec) { - this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID); + final TableDesc table, + final Map<String, String> partitionSpec) { + this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, null); } public LoadTableDesc(final Path sourcePath, - final org.apache.hadoop.hive.ql.plan.TableDesc table, + final TableDesc table, final DynamicPartitionCtx dpCtx, - final AcidUtils.Operation writeType) { + final AcidUtils.Operation writeType, Long currentTransactionId) { super(sourcePath, writeType); this.dpCtx = dpCtx; + this.currentTransactionId = currentTransactionId; if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) { init(table, dpCtx.getPartSpec(), true); } else { - init(table, new LinkedHashMap<String, String>(), true); + init(table, new LinkedHashMap<>(), true); } } @@ -174,4 +182,7 @@ public class LoadTableDesc extends LoadDesc implements Serializable { this.lbCtx = lbCtx; } + public long getCurrentTransactionId() { + return writeType == AcidUtils.Operation.NOT_ACID ? 0L : currentTransactionId; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 8ce211f..00c0ce3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.plan.Explain.Level; - +import org.apache.hadoop.hive.ql.session.LineageState; /** * MoveWork. @@ -38,6 +38,12 @@ public class MoveWork implements Serializable { private LoadTableDesc loadTableWork; private LoadFileDesc loadFileWork; private LoadMultiFilesDesc loadMultiFilesWork; + /* + these are sessionState objects that are copied over to work to allow for parallel execution. + based on the current use case the methods are selectively synchronized, which might need to be + taken care when using other methods. + */ + private final LineageState sessionStateLineageState; private boolean checkFileFormat; private boolean srcLocal; @@ -57,17 +63,20 @@ public class MoveWork implements Serializable { protected List<Partition> movedParts; public MoveWork() { + sessionStateLineageState = null; } - public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) { + private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs, + LineageState lineageState) { this.inputs = inputs; this.outputs = outputs; + sessionStateLineageState = lineageState; } public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat, boolean srcLocal) { - this(inputs, outputs); + boolean checkFileFormat, boolean srcLocal, LineageState lineageState) { + this(inputs, outputs, lineageState); this.loadTableWork = loadTableWork; this.loadFileWork = loadFileWork; this.checkFileFormat = checkFileFormat; @@ -76,8 +85,8 @@ public class MoveWork implements Serializable { public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat) { - this(inputs, outputs); + boolean checkFileFormat, LineageState lineageState) { + this(inputs, outputs, lineageState); this.loadTableWork = loadTableWork; this.loadFileWork = loadFileWork; this.checkFileFormat = checkFileFormat; @@ -91,6 +100,7 @@ public class MoveWork implements Serializable { srcLocal = o.isSrcLocal(); inputs = o.getInputs(); outputs = o.getOutputs(); + sessionStateLineageState = o.sessionStateLineageState; } @Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -152,4 +162,7 @@ public class MoveWork implements Serializable { this.srcLocal = srcLocal; } + public LineageState getLineagState() { + return sessionStateLineageState; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/39d8d73e/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java index e2f2a68..056d614 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/LineageState.java @@ -21,11 +21,11 @@ package org.apache.hadoop.hive.ql.session; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.io.Serializable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.exec.ColumnInfo; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.hooks.LineageInfo; import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer; @@ -36,7 +36,7 @@ import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx.Index; * lineage information for the post execution hooks. * */ -public class LineageState { +public class LineageState implements Serializable { /** * Mapping from the directory name to FileSinkOperator (may not be FileSinkOperator for views). This @@ -44,7 +44,7 @@ public class LineageState { * time and is then later used to created the mapping from * movetask to the set of filesink operators. */ - private final Map<Path, Operator> dirToFop; + private final Map<String, Operator> dirToFop; /** * The lineage context index for this query. @@ -60,8 +60,8 @@ public class LineageState { /** * Constructor. */ - public LineageState() { - dirToFop = new HashMap<Path, Operator>(); + LineageState() { + dirToFop = new HashMap<>(); linfo = new LineageInfo(); index = new Index(); } @@ -72,8 +72,8 @@ public class LineageState { * @param dir The directory name. * @param fop The sink operator. */ - public void mapDirToOp(Path dir, Operator fop) { - dirToFop.put(dir, fop); + public synchronized void mapDirToOp(Path dir, Operator fop) { + dirToFop.put(dir.toUri().toString(), fop); } /** @@ -83,10 +83,10 @@ public class LineageState { * @param newPath conditional input path * @param oldPath path of the old linked MoveWork */ - public void updateDirToOpMap(Path newPath, Path oldPath) { - Operator op = dirToFop.get(oldPath); + public synchronized void updateDirToOpMap(Path newPath, Path oldPath) { + Operator op = dirToFop.get(oldPath.toUri().toString()); if (op != null) { - dirToFop.put(newPath, op); + dirToFop.put(newPath.toUri().toString(), op); } } @@ -97,10 +97,10 @@ public class LineageState { * @param dc The associated data container. * @param cols The list of columns. */ - public void setLineage(Path dir, DataContainer dc, + public synchronized void setLineage(Path dir, DataContainer dc, List<FieldSchema> cols) { // First lookup the file sink operator from the load work. - Operator<?> op = dirToFop.get(dir); + Operator<?> op = dirToFop.get(dir.toUri().toString()); // Go over the associated fields and look up the dependencies // by position in the row schema of the filesink operator. @@ -136,7 +136,7 @@ public class LineageState { /** * Clear all lineage states */ - public void clear() { + public synchronized void clear() { dirToFop.clear(); linfo.clear(); index.clear();
