HIVE-12758 : Parallel compilation: Operator::resetId() is not thread-safe (Sergey Shelukhin, reviewed by Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8271c63d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8271c63d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8271c63d Branch: refs/heads/branch-2.0 Commit: 8271c63df33f736e43cb6102dba8a7838d2bd0c6 Parents: cb2de9b Author: Sergey Shelukhin <ser...@apache.org> Authored: Mon Jan 18 18:37:53 2016 -0800 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Mon Jan 18 18:47:51 2016 -0800 ---------------------------------------------------------------------- .../mapreduce/TestHCatMultiOutputFormat.java | 4 +- .../hadoop/hive/ql/CompilationOpContext.java | 36 +++ .../java/org/apache/hadoop/hive/ql/Context.java | 5 + .../java/org/apache/hadoop/hive/ql/Driver.java | 10 +- .../hive/ql/exec/AbstractFileMergeOperator.java | 10 + .../hive/ql/exec/AbstractMapJoinOperator.java | 9 +- .../hive/ql/exec/AppMasterEventOperator.java | 10 + .../hadoop/hive/ql/exec/CollectOperator.java | 10 + .../hadoop/hive/ql/exec/ColumnStatsTask.java | 8 +- .../hive/ql/exec/ColumnStatsUpdateTask.java | 6 +- .../hadoop/hive/ql/exec/CommonJoinOperator.java | 12 +- .../hive/ql/exec/CommonMergeJoinOperator.java | 8 +- .../hadoop/hive/ql/exec/ConditionalTask.java | 5 - .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 19 +- .../hadoop/hive/ql/exec/DemuxOperator.java | 10 + .../hadoop/hive/ql/exec/DummyStoreOperator.java | 8 +- .../apache/hadoop/hive/ql/exec/FetchTask.java | 8 +- .../hadoop/hive/ql/exec/FileSinkOperator.java | 10 + .../hadoop/hive/ql/exec/FilterOperator.java | 8 +- .../hadoop/hive/ql/exec/ForwardOperator.java | 10 + .../hadoop/hive/ql/exec/FunctionTask.java | 6 +- .../hadoop/hive/ql/exec/GroupByOperator.java | 10 + .../hive/ql/exec/HashTableDummyOperator.java | 10 + .../hive/ql/exec/HashTableSinkOperator.java | 12 +- .../hadoop/hive/ql/exec/JoinOperator.java | 10 + .../ql/exec/LateralViewForwardOperator.java | 10 + .../hive/ql/exec/LateralViewJoinOperator.java | 10 + .../hadoop/hive/ql/exec/LimitOperator.java | 10 + .../hadoop/hive/ql/exec/ListSinkOperator.java | 10 + .../hadoop/hive/ql/exec/MapJoinOperator.java | 9 +- .../apache/hadoop/hive/ql/exec/MapOperator.java | 10 + .../apache/hadoop/hive/ql/exec/MuxOperator.java | 10 + .../apache/hadoop/hive/ql/exec/Operator.java | 57 ++-- .../hadoop/hive/ql/exec/OperatorFactory.java | 287 ++++++++++--------- .../hive/ql/exec/OrcFileMergeOperator.java | 10 + .../apache/hadoop/hive/ql/exec/PTFOperator.java | 10 + .../hive/ql/exec/RCFileMergeOperator.java | 11 + .../hadoop/hive/ql/exec/ReduceSinkOperator.java | 10 + .../hadoop/hive/ql/exec/SMBMapJoinOperator.java | 9 +- .../hadoop/hive/ql/exec/ScriptOperator.java | 10 + .../hadoop/hive/ql/exec/SelectOperator.java | 10 + .../hive/ql/exec/SerializationUtilities.java | 51 +++- .../ql/exec/SparkHashTableSinkOperator.java | 12 +- .../hadoop/hive/ql/exec/StatsNoJobTask.java | 6 +- .../hadoop/hive/ql/exec/TableScanOperator.java | 10 + .../org/apache/hadoop/hive/ql/exec/Task.java | 4 +- .../hive/ql/exec/TemporaryHashSinkOperator.java | 4 +- .../hadoop/hive/ql/exec/TerminalOperator.java | 9 + .../hive/ql/exec/TezDummyStoreOperator.java | 10 + .../hadoop/hive/ql/exec/UDTFOperator.java | 10 + .../hadoop/hive/ql/exec/UnionOperator.java | 12 +- .../hadoop/hive/ql/exec/mr/ExecDriver.java | 6 +- .../hadoop/hive/ql/exec/mr/ExecMapper.java | 6 +- .../hadoop/hive/ql/exec/mr/HashTableLoader.java | 3 +- .../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 6 +- .../hive/ql/exec/spark/HashTableLoader.java | 3 +- .../ql/exec/spark/SparkMapRecordHandler.java | 6 +- .../hadoop/hive/ql/exec/spark/SparkTask.java | 6 +- .../hive/ql/exec/tez/MapRecordProcessor.java | 10 +- .../vector/VectorAppMasterEventOperator.java | 15 +- .../ql/exec/vector/VectorFileSinkOperator.java | 14 +- .../ql/exec/vector/VectorFilterOperator.java | 14 +- .../ql/exec/vector/VectorGroupByOperator.java | 15 +- .../ql/exec/vector/VectorLimitOperator.java | 12 +- .../exec/vector/VectorMapJoinBaseOperator.java | 14 +- .../ql/exec/vector/VectorMapJoinOperator.java | 14 +- .../VectorMapJoinOuterFilteredOperator.java | 14 +- .../hive/ql/exec/vector/VectorMapOperator.java | 10 + .../exec/vector/VectorReduceSinkOperator.java | 14 +- .../exec/vector/VectorSMBMapJoinOperator.java | 14 +- .../ql/exec/vector/VectorSelectOperator.java | 14 +- .../VectorSparkHashTableSinkOperator.java | 14 +- ...VectorSparkPartitionPruningSinkOperator.java | 15 +- .../mapjoin/VectorMapJoinCommonOperator.java | 14 +- .../VectorMapJoinGenerateResultOperator.java | 14 +- ...pJoinInnerBigOnlyGenerateResultOperator.java | 14 +- .../VectorMapJoinInnerBigOnlyLongOperator.java | 13 +- ...ctorMapJoinInnerBigOnlyMultiKeyOperator.java | 13 +- ...VectorMapJoinInnerBigOnlyStringOperator.java | 13 +- ...ectorMapJoinInnerGenerateResultOperator.java | 14 +- .../mapjoin/VectorMapJoinInnerLongOperator.java | 13 +- .../VectorMapJoinInnerMultiKeyOperator.java | 13 +- .../VectorMapJoinInnerStringOperator.java | 13 +- ...orMapJoinLeftSemiGenerateResultOperator.java | 14 +- .../VectorMapJoinLeftSemiLongOperator.java | 13 +- .../VectorMapJoinLeftSemiMultiKeyOperator.java | 13 +- .../VectorMapJoinLeftSemiStringOperator.java | 13 +- ...ectorMapJoinOuterGenerateResultOperator.java | 14 +- .../mapjoin/VectorMapJoinOuterLongOperator.java | 13 +- .../VectorMapJoinOuterMultiKeyOperator.java | 13 +- .../VectorMapJoinOuterStringOperator.java | 13 +- .../VectorReduceSinkCommonOperator.java | 14 +- .../VectorReduceSinkLongOperator.java | 14 +- .../VectorReduceSinkMultiKeyOperator.java | 14 +- .../VectorReduceSinkStringOperator.java | 14 +- .../apache/hadoop/hive/ql/hooks/ATSHook.java | 2 +- .../hadoop/hive/ql/io/merge/MergeFileTask.java | 5 +- .../ql/io/rcfile/stats/PartialScanTask.java | 7 +- .../io/rcfile/truncate/ColumnTruncateTask.java | 5 +- .../hive/ql/optimizer/AbstractSMBJoinProc.java | 2 +- .../hive/ql/optimizer/ConvertJoinMapJoin.java | 8 +- .../DynamicPartitionPruningOptimization.java | 4 +- .../hive/ql/optimizer/GenMRTableScan1.java | 2 +- .../hive/ql/optimizer/GenMapRedUtils.java | 23 +- .../hive/ql/optimizer/GroupByOptimizer.java | 4 +- .../hive/ql/optimizer/MapJoinProcessor.java | 12 +- .../ql/optimizer/ReduceSinkMapJoinProc.java | 4 +- .../ql/optimizer/SimpleFetchAggregation.java | 3 +- .../hive/ql/optimizer/SimpleFetchOptimizer.java | 3 +- .../hive/ql/optimizer/SkewJoinOptimizer.java | 8 +- .../calcite/translator/HiveGBOpConvUtil.java | 10 +- .../calcite/translator/HiveOpConverter.java | 34 +-- .../QueryPlanTreeTransformation.java | 12 +- .../physical/CommonJoinTaskDispatcher.java | 4 +- .../physical/GenMRSkewJoinProcessor.java | 7 +- .../physical/GenSparkSkewJoinProcessor.java | 13 +- .../physical/LocalMapJoinProcFactory.java | 5 +- .../hive/ql/optimizer/physical/Vectorizer.java | 15 +- .../spark/SparkReduceSinkMapJoinProc.java | 7 +- .../optimizer/unionproc/UnionProcFactory.java | 3 +- .../hive/ql/parse/BaseSemanticAnalyzer.java | 2 + .../hive/ql/parse/ExplainSemanticAnalyzer.java | 2 +- .../apache/hadoop/hive/ql/parse/GenTezWork.java | 5 +- .../hive/ql/parse/ProcessAnalyzeTable.java | 3 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 31 +- .../hive/ql/parse/spark/GenSparkWork.java | 3 +- .../SparkPartitionPruningSinkOperator.java | 10 + .../parse/spark/SparkProcessAnalyzeTable.java | 3 +- .../apache/hadoop/hive/ql/plan/BaseWork.java | 1 + .../hadoop/hive/ql/plan/ColumnStatsWork.java | 5 +- .../apache/hadoop/hive/ql/plan/FetchWork.java | 5 +- .../org/apache/hadoop/hive/ql/plan/MapWork.java | 5 + .../apache/hadoop/hive/ql/plan/MapredWork.java | 7 +- .../hadoop/hive/ql/plan/MergeJoinWork.java | 5 + .../apache/hadoop/hive/ql/plan/ReduceWork.java | 6 + .../apache/hadoop/hive/ql/plan/UnionWork.java | 5 + .../hadoop/hive/ql/ppd/OpProcFactory.java | 3 +- .../ql/ppd/PredicateTransitivePropagate.java | 4 +- .../hive/ql/ppd/SyntheticJoinPredicate.java | 4 +- .../hadoop/hive/ql/exec/TestExecDriver.java | 38 +-- .../hive/ql/exec/TestFileSinkOperator.java | 4 +- .../hadoop/hive/ql/exec/TestOperators.java | 17 +- .../apache/hadoop/hive/ql/exec/TestPlan.java | 3 +- .../exec/vector/TestVectorFilterOperator.java | 3 +- .../exec/vector/TestVectorGroupByOperator.java | 58 ++-- .../ql/exec/vector/TestVectorLimitOperator.java | 3 +- .../exec/vector/TestVectorSelectOperator.java | 10 +- .../vector/util/FakeCaptureOutputOperator.java | 14 +- .../util/FakeVectorDataSourceOperator.java | 16 +- .../ql/optimizer/physical/TestVectorizer.java | 7 +- .../hadoop/hive/ql/parse/TestGenTezWork.java | 8 +- .../parse/TestUpdateDeleteSemanticAnalyzer.java | 2 +- .../hive/ql/testutil/BaseScalarUdfTest.java | 3 +- .../results/clientpositive/auto_join0.q.out | 8 +- .../cbo_rp_cross_product_check_2.q.out | 4 +- .../clientpositive/cross_product_check_2.q.out | 4 +- .../subquery_multiinsert.q.java1.7.out | 4 +- .../subquery_multiinsert.q.java1.8.out | 60 ++-- 158 files changed, 1379 insertions(+), 598 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java index 8148faa..61efc1a 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMultiOutputFormat.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Random; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -40,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -392,7 +392,7 @@ public class TestHCatMultiOutputFormat { } FetchTask task = new FetchTask(); task.setWork(work); - task.initialize(conf, null, null); + task.initialize(conf, null, null, new CompilationOpContext()); task.fetch(temp); for (String str : temp) { results.add(str.replace("\t", ",")); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java b/ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java new file mode 100644 index 0000000..949f873 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/CompilationOpContext.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A subset of compilation context that is passed to operators to get rid of some globals. + * Perhaps this should be rolled into main Context; however, some code necessitates storing the + * context in the operators for now, so this may not be advisable given how much stuff the main + * Context class contains. + * For now, only the operator sequence ID lives here. + */ +public class CompilationOpContext { + private final AtomicInteger opSeqId = new AtomicInteger(0); + + public int nextOperatorId() { + return opSeqId.getAndIncrement(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index affaec8..746456b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -67,6 +67,7 @@ public class Context { private int resDirFilesNum; boolean initialized; String originalTracker = null; + private final CompilationOpContext opContext; private final Map<String, ContentSummary> pathToCS = new ConcurrentHashMap<String, ContentSummary>(); // scratch path to use for all non-local (ie. hdfs) file system tmp folders @@ -133,6 +134,7 @@ public class Context { localScratchDir = new Path(SessionState.getLocalSessionPath(conf), executionId).toUri().getPath(); scratchDirPermission = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION); stagingDir = HiveConf.getVar(conf, HiveConf.ConfVars.STAGINGDIR); + opContext = new CompilationOpContext(); } @@ -715,4 +717,7 @@ public class Context { this.cboSucceeded = cboSucceeded; } + public CompilationOpContext getOpContext() { + return opContext; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index af6f8b2..04db836 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -176,7 +176,7 @@ public class Driver implements CommandProcessor { @Override public void init() { - Operator.resetId(); + // Nothing for now. } /** @@ -501,7 +501,7 @@ public class Driver implements CommandProcessor { // initialize FetchTask right here if (plan.getFetchTask() != null) { - plan.getFetchTask().initialize(conf, plan, null); + plan.getFetchTask().initialize(conf, plan, null, ctx.getOpContext()); } //do the authorization check @@ -579,7 +579,7 @@ public class Driver implements CommandProcessor { ASTNode astTree) throws IOException { String ret = null; ExplainTask task = new ExplainTask(); - task.initialize(conf, plan, null); + task.initialize(conf, plan, null, ctx.getOpContext()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos); try { @@ -1820,7 +1820,7 @@ public class Driver implements CommandProcessor { cxt.incCurJobNo(1); console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs); } - tsk.initialize(conf, plan, cxt); + tsk.initialize(conf, plan, cxt, ctx.getOpContext()); TaskResult tskRes = new TaskResult(); TaskRunner tskRun = new TaskRunner(tsk, tskRes); @@ -1910,7 +1910,7 @@ public class Driver implements CommandProcessor { throw new IOException("Error closing the current fetch task", e); } // FetchTask should not depend on the plan. - fetchTask.initialize(conf, null, null); + fetchTask.initialize(conf, null, null, ctx.getOpContext()); } else { ctx.resetStream(); resStream = null; http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index a3ec0e1..f99bf11 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.FileMergeDesc; @@ -61,6 +62,15 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> protected Set<Path> incompatFileSet; protected transient DynamicPartitionCtx dpCtx; + /** Kryo ctor. */ + protected AbstractFileMergeOperator() { + super(); + } + + public AbstractFileMergeOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override public void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java index 7302688..69ba4a2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; @@ -50,7 +51,13 @@ public abstract class AbstractMapJoinOperator <T extends MapJoinDesc> extends Co transient int numMapRowsRead; - public AbstractMapJoinOperator() { + /** Kryo ctor. */ + protected AbstractMapJoinOperator() { + super(); + } + + public AbstractMapJoinOperator(CompilationOpContext ctx) { + super(ctx); } public AbstractMapJoinOperator(AbstractMapJoinOperator<? extends MapJoinDesc> mjop) { http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java index 7114177..743098b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AppMasterEventOperator.java @@ -27,6 +27,7 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.tez.TezContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc; @@ -51,6 +52,15 @@ public class AppMasterEventOperator extends Operator<AppMasterEventDesc> { protected transient boolean hasReachedMaxSize = false; protected transient long MAX_SIZE; + /** Kryo ctor. */ + protected AppMasterEventOperator() { + super(); + } + + public AppMasterEventOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override public void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java index e2f4f58..27ddf13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.CollectDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -42,6 +43,15 @@ public class CollectOperator extends Operator<CollectDesc> implements protected transient ObjectInspector standardRowInspector; transient int maxSize; + /** Kryo ctor. */ + protected CollectOperator() { + super(); + } + + public CollectOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java index f6fbe74..7914471 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -81,9 +82,10 @@ public class ColumnStatsTask extends Task<ColumnStatsWork> implements Serializab } @Override - public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { - super.initialize(conf, queryPlan, ctx); - work.initializeForFetch(); + public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx, + CompilationOpContext opContext) { + super.initialize(conf, queryPlan, ctx, opContext); + work.initializeForFetch(opContext); try { JobConf job = new JobConf(conf); ftOp = new FetchOperator(work.getfWork(), job); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java index dcbbe2e..a1b98f4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -68,8 +69,9 @@ public class ColumnStatsUpdateTask extends Task<ColumnStatsUpdateWork> { .getLogger(ColumnStatsUpdateTask.class); @Override - public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { - super.initialize(conf, queryPlan, ctx); + public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx, + CompilationOpContext opContext) { + super.initialize(conf, queryPlan, ctx, opContext); } private ColumnStatistics constructColumnStatsFromInput() http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index b0170f5..f8520f8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.persistence.AbstractRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -125,17 +126,23 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends protected transient int heartbeatInterval; protected static final int NOTSKIPBIGTABLE = -1; - public CommonJoinOperator() { + /** Kryo ctor. */ + protected CommonJoinOperator() { + super(); + } + + public CommonJoinOperator(CompilationOpContext ctx) { + super(ctx); } public CommonJoinOperator(CommonJoinOperator<T> clone) { + super(clone.id, clone.cContext); this.joinEmitInterval = clone.joinEmitInterval; this.joinCacheSize = clone.joinCacheSize; this.nextSz = clone.nextSz; this.childOperators = clone.childOperators; this.parentOperators = clone.parentOperators; this.done = clone.done; - this.operatorId = clone.operatorId; this.storage = clone.storage; this.condn = clone.condn; this.conf = clone.getConf(); @@ -150,7 +157,6 @@ public abstract class CommonJoinOperator<T extends JoinDesc> extends this.groupKeyObject = clone.groupKeyObject; this.handleSkewJoin = clone.handleSkewJoin; this.hconf = clone.hconf; - this.id = clone.id; this.inputObjInspectors = clone.inputObjInspectors; this.noOuterJoin = clone.noOuterJoin; this.numAliases = clone.numAliases; http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java index 1cbd13d..8693200 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; import org.apache.hadoop.hive.ql.exec.tez.RecordSource; import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource; @@ -90,10 +91,15 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge new ArrayList<Operator<? extends OperatorDesc>>(); transient Set<Integer> fetchInputAtClose; - public CommonMergeJoinOperator() { + /** Kryo ctor. */ + protected CommonMergeJoinOperator() { super(); } + public CommonMergeJoinOperator(CompilationOpContext ctx) { + super(ctx); + } + @SuppressWarnings("unchecked") @Override public void initializeOp(Configuration hconf) throws HiveException { http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java index 031331e..c96c813 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java @@ -72,11 +72,6 @@ public class ConditionalTask extends Task<ConditionalWork> implements Serializab } @Override - public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) { - super.initialize(conf, queryPlan, driverContext); - } - - @Override public int execute(DriverContext driverContext) { resTasks = resolver.getTasks(conf, resolverCtx); resolved = true; http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 341cc61..28532c2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement; import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.TxnInfo; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -258,8 +259,9 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } @Override - public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { - super.initialize(conf, queryPlan, ctx); + public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx, + CompilationOpContext opContext) { + super.initialize(conf, queryPlan, ctx, opContext); // Pick the formatter to use to display the results. Either the // normal human readable output or a json object. @@ -505,7 +507,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { AlterTablePartMergeFilesDesc mergeFilesDesc = work.getMergeFilesDesc(); if (mergeFilesDesc != null) { - return mergeFiles(db, mergeFilesDesc); + return mergeFiles(db, mergeFilesDesc, driverContext); } AlterTableAlterPartDesc alterPartDesc = work.getAlterTableAlterPartDesc(); @@ -596,8 +598,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable { * @return * @throws HiveException */ - private int mergeFiles(Hive db, AlterTablePartMergeFilesDesc mergeFilesDesc) - throws HiveException { + private int mergeFiles(Hive db, AlterTablePartMergeFilesDesc mergeFilesDesc, + DriverContext driverContext) throws HiveException { ListBucketingCtx lbCtx = mergeFilesDesc.getLbCtx(); boolean lbatc = lbCtx == null ? false : lbCtx.isSkewedStoredAsDir(); int lbd = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel(); @@ -629,7 +631,8 @@ public class DDLTask extends Task<DDLWork> implements Serializable { fmd.setListBucketingDepth(lbd); fmd.setOutputPath(mergeFilesDesc.getOutputDir()); - Operator<? extends OperatorDesc> mergeOp = OperatorFactory.get(fmd); + CompilationOpContext opContext = driverContext.getCtx().getOpContext(); + Operator<? extends OperatorDesc> mergeOp = OperatorFactory.get(opContext, fmd); LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = new LinkedHashMap<String, Operator<? extends OperatorDesc>>(); @@ -649,7 +652,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } // initialize the task and execute - task.initialize(db.getConf(), getQueryPlan(), driverCxt); + task.initialize(db.getConf(), getQueryPlan(), driverCxt, opContext); int ret = task.execute(driverCxt); return ret; } @@ -4242,7 +4245,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { truncateWork.setMapperCannotSpanPartns(true); DriverContext driverCxt = new DriverContext(); ColumnTruncateTask taskExec = new ColumnTruncateTask(); - taskExec.initialize(db.getConf(), null, driverCxt); + taskExec.initialize(db.getConf(), null, driverCxt, null); taskExec.setWork(truncateWork); taskExec.setQueryPlan(this.getQueryPlan()); return taskExec.execute(driverCxt); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java index 0888c7b..b897c16 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DemuxOperator.java @@ -30,6 +30,7 @@ import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.DemuxDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -109,6 +110,15 @@ public class DemuxOperator extends Operator<DemuxDesc> // its children's parents lists, also see childOperatorsTag in Operator) at here. private int[][] newChildOperatorsTag; + /** Kryo ctor. */ + protected DemuxOperator() { + super(); + } + + public DemuxOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java index 0c12570..06a3884 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DummyStoreOperator.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.DummyStoreDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -69,10 +70,15 @@ public class DummyStoreOperator extends Operator<DummyStoreDesc> implements Seri protected transient InspectableObject result; - public DummyStoreOperator() { + /** Kryo ctor. */ + protected DummyStoreOperator() { super(); } + public DummyStoreOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index 1634143..4415328 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; @@ -59,9 +60,10 @@ public class FetchTask extends Task<FetchWork> implements Serializable { } @Override - public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { - super.initialize(conf, queryPlan, ctx); - work.initializeForFetch(); + public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx, + CompilationOpContext opContext) { + super.initialize(conf, queryPlan, ctx, opContext); + work.initializeForFetch(opContext); try { // Create a file system handle http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 32bfcf5..2fa3d96 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; @@ -322,6 +323,15 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements childSpecPathDynLinkedPartitions = conf.getDirName().getName(); } + /** Kryo ctor. */ + protected FileSinkOperator() { + super(); + } + + public FileSinkOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java index 0e7e79d..08f2633 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java @@ -24,6 +24,7 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.io.IOContext; import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -45,8 +46,13 @@ public class FilterOperator extends Operator<FilterDesc> implements private transient IOContext ioContext; protected transient int heartbeatInterval; - public FilterOperator() { + /** Kryo ctor. */ + protected FilterOperator() { super(); + } + + public FilterOperator(CompilationOpContext ctx) { + super(ctx); consecutiveSearches = 0; } http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java index 7a4c58a..2df7cca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ForwardDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -61,6 +62,15 @@ public class ForwardOperator extends Operator<ForwardDesc> implements return "FOR"; } + /** Kryo ctor. */ + protected ForwardOperator() { + super(); + } + + public ForwardOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java index ec755a8..ed6f062 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.ResourceType; import org.apache.hadoop.hive.metastore.api.ResourceUri; import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -60,8 +61,9 @@ public class FunctionTask extends Task<FunctionWork> { } @Override - public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) { - super.initialize(conf, queryPlan, ctx); + public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx, + CompilationOpContext opContext) { + super.initialize(conf, queryPlan, ctx, opContext); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java index 1693ec3..0839b42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java @@ -37,6 +37,7 @@ import javolution.util.FastBitSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.OpParseContext; import org.apache.hadoop.hive.ql.plan.AggregationDesc; @@ -179,6 +180,15 @@ public class GroupByOperator extends Operator<GroupByDesc> { return bits; } + /** Kryo ctor. */ + protected GroupByOperator() { + super(); + } + + public GroupByOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java index 1de8c76..4749247 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -32,6 +33,15 @@ import org.apache.hadoop.hive.serde2.SerDeUtils; public class HashTableDummyOperator extends Operator<HashTableDummyDesc> implements Serializable { private static final long serialVersionUID = 1L; + /** Kryo ctor. */ + protected HashTableDummyOperator() { + super(); + } + + public HashTableDummyOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index 76308f6..deb7c76 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer; @@ -104,10 +105,17 @@ public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc> i private long hashTableScale; private MapJoinMemoryExhaustionHandler memoryExhaustionHandler; - public HashTableSinkOperator() { + /** Kryo ctor. */ + protected HashTableSinkOperator() { + super(); } - public HashTableSinkOperator(MapJoinOperator mjop) { + public HashTableSinkOperator(CompilationOpContext ctx) { + super(ctx); + } + + public HashTableSinkOperator(CompilationOpContext ctx, MapJoinOperator mjop) { + this(ctx); this.conf = new HashTableSinkDesc(mjop.getConf()); } http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index 3453fc9..08cc4b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -55,6 +56,15 @@ public class JoinOperator extends CommonJoinOperator<JoinDesc> implements Serial private final transient LongWritable skewjoin_followup_jobs = new LongWritable(0); + /** Kryo ctor. */ + protected JoinOperator() { + super(); + } + + public JoinOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java index e866eed..4c94ad9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewForwardOperator.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.LateralViewForwardDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -55,6 +56,15 @@ public class LateralViewForwardOperator extends Operator<LateralViewForwardDesc> return OperatorType.LATERALVIEWFORWARD; } + /** Kryo ctor. */ + protected LateralViewForwardOperator() { + super(); + } + + public LateralViewForwardOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java index 55bb08f..7407dc6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -83,6 +84,15 @@ public class LateralViewJoinOperator extends Operator<LateralViewJoinDesc> { public static final byte SELECT_TAG = 0; public static final byte UDTF_TAG = 1; + /** Kryo ctor. */ + protected LateralViewJoinOperator() { + super(); + } + + public LateralViewJoinOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java index fc85bea..239d56b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.LimitDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -39,6 +40,15 @@ public class LimitOperator extends Operator<LimitDesc> implements Serializable { protected transient int currCount; protected transient boolean isMap; + /** Kryo ctor. */ + protected LimitOperator() { + super(); + } + + public LimitOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java index 919e72f..2f2abc1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ListSinkOperator.java @@ -24,6 +24,7 @@ import java.util.Properties; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ListSinkDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; @@ -43,6 +44,15 @@ public class ListSinkOperator extends Operator<ListSinkDesc> { private transient FetchFormatter fetcher; private transient int numRows; + /** Kryo ctor. */ + protected ListSinkOperator() { + super(); + } + + public ListSinkOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index dc0b85e..91b5ca7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.HashTableLoaderFactory; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.persistence.BytesBytesMultiHashMap; @@ -97,7 +98,13 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem protected HybridHashTableContainer firstSmallTable; // The first small table; // Only this table has spilled big table rows - public MapJoinOperator() { + /** Kryo ctor. */ + protected MapJoinOperator() { + super(); + } + + public MapJoinOperator(CompilationOpContext ctx) { + super(ctx); } public MapJoinOperator(AbstractMapJoinOperator<? extends MapJoinDesc> mjop) { http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index 99724c1..3bd96b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.MapOperator.MapOpCtx; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.io.RecordIdentifier; @@ -460,6 +461,15 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon return nominal; } + /** Kryo ctor. */ + protected MapOperator() { + super(); + } + + public MapOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override public void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java index 4f4abd3..d8444fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MuxOperator.java @@ -28,6 +28,7 @@ import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.MuxDesc; @@ -170,6 +171,15 @@ public class MuxOperator extends Operator<MuxDesc> implements Serializable{ private transient long[] cntrs; private transient long[] nextCntrs; + /** Kryo ctor. */ + protected MuxOperator() { + super(); + } + + public MuxOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 9a86a35..0c7f52c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -30,11 +30,11 @@ import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -67,6 +67,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C public static final String CONTEXT_NAME_KEY = "__hive.context.name"; private transient Configuration configuration; + protected transient CompilationOpContext cContext; protected List<Operator<? extends OperatorDesc>> childOperators; protected List<Operator<? extends OperatorDesc>> parentOperators; protected String operatorId; @@ -75,8 +76,6 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C private transient boolean rootInitializeCalled = false; protected final transient Collection<Future<?>> asyncInitOperations = new HashSet<>(); - private static AtomicInteger seqId; - // It can be optimized later so that an operator operator (init/close) is performed // only after that operation has been performed on all the parents. This will require // initializing the whole tree in all the mappers (which might be required for mappers @@ -98,38 +97,24 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C protected transient State state = State.UNINIT; - static { - seqId = new AtomicInteger(0); - } - private boolean useBucketizedHiveInputFormat; // dummy operator (for not increasing seqId) - private Operator(String name) { - id = name; + protected Operator(String name, CompilationOpContext cContext) { + this(); + this.cContext = cContext; + this.id = name; initOperatorId(); + } + + protected Operator() { childOperators = new ArrayList<Operator<? extends OperatorDesc>>(); parentOperators = new ArrayList<Operator<? extends OperatorDesc>>(); abortOp = new AtomicBoolean(false); } - public Operator() { - this(String.valueOf(seqId.getAndIncrement())); - } - - public static void resetId() { - seqId.set(0); - } - - /** - * Create an operator with a reporter. - * - * @param reporter - * Used to report progress of certain operators. - */ - public Operator(Reporter reporter) { - this(); - this.reporter = reporter; + public Operator(CompilationOpContext cContext) { + this(String.valueOf(cContext.nextOperatorId()), cContext); } public void setChildOperators( @@ -228,7 +213,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C protected transient final boolean isLogTraceEnabled = LOG.isTraceEnabled() && PLOG.isTraceEnabled(); protected transient String alias; protected transient Reporter reporter; - protected transient String id; + protected String id; // object inspectors for input rows // We will increase the size of the array on demand protected transient ObjectInspector[] inputObjInspectors = new ObjectInspector[1]; @@ -1129,8 +1114,8 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C @SuppressWarnings("unchecked") T descClone = (T)conf.clone(); // also clone the colExprMap by default - Operator<? extends OperatorDesc> ret = - OperatorFactory.getAndMakeChild(descClone, getSchema(), getColumnExprMap(), parentClones); + Operator<? extends OperatorDesc> ret = OperatorFactory.getAndMakeChild( + cContext, descClone, getSchema(), getColumnExprMap(), parentClones); return ret; } @@ -1145,8 +1130,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException { T descClone = (T) conf.clone(); Operator<? extends OperatorDesc> ret = - OperatorFactory.getAndMakeChild( - descClone, getSchema()); + OperatorFactory.getAndMakeChild(cContext, descClone, getSchema()); return ret; } @@ -1355,7 +1339,7 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C @SuppressWarnings({ "serial", "unchecked", "rawtypes" }) private static class DummyOperator extends Operator { - public DummyOperator() { super("dummy"); } + public DummyOperator() { super("dummy", null); } @Override public void process(Object row, int tag) { @@ -1384,4 +1368,13 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C public String getReduceOutputName() { return null; } + + public void setCompilationOpContext(CompilationOpContext ctx) { + cContext = ctx; + } + + /** @return Compilation operator context. Only available during compilation. */ + public CompilationOpContext getCompilationOpContext() { + return cContext; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index f619a56..038b96c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -19,11 +19,13 @@ package org.apache.hadoop.hive.ql.exec; import java.util.ArrayList; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator; @@ -72,6 +74,8 @@ import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.UDTFDesc; import org.apache.hadoop.hive.ql.plan.UnionDesc; +import com.google.common.base.Preconditions; + /** * OperatorFactory. * @@ -79,97 +83,68 @@ import org.apache.hadoop.hive.ql.plan.UnionDesc; @SuppressWarnings({ "rawtypes", "unchecked" }) public final class OperatorFactory { protected static transient final Logger LOG = LoggerFactory.getLogger(OperatorFactory.class); - private static final List<OpTuple> opvec; - private static final List<OpTuple> vectorOpvec; + private static final IdentityHashMap<Class<? extends OperatorDesc>, + Class<? extends Operator<? extends OperatorDesc>>> opvec = new IdentityHashMap<>(); + private static final IdentityHashMap<Class<? extends OperatorDesc>, + Class<? extends Operator<? extends OperatorDesc>>> vectorOpvec = new IdentityHashMap<>(); static { - opvec = new ArrayList<OpTuple>(); - opvec.add(new OpTuple<FilterDesc>(FilterDesc.class, FilterOperator.class)); - opvec.add(new OpTuple<SelectDesc>(SelectDesc.class, SelectOperator.class)); - opvec.add(new OpTuple<ForwardDesc>(ForwardDesc.class, ForwardOperator.class)); - opvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, FileSinkOperator.class)); - opvec.add(new OpTuple<CollectDesc>(CollectDesc.class, CollectOperator.class)); - opvec.add(new OpTuple<ScriptDesc>(ScriptDesc.class, ScriptOperator.class)); - opvec.add(new OpTuple<PTFDesc>(PTFDesc.class, PTFOperator.class)); - opvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class, ReduceSinkOperator.class)); - opvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, GroupByOperator.class)); - opvec.add(new OpTuple<JoinDesc>(JoinDesc.class, JoinOperator.class)); - opvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, MapJoinOperator.class)); - opvec.add(new OpTuple<SMBJoinDesc>(SMBJoinDesc.class, SMBMapJoinOperator.class)); - opvec.add(new OpTuple<LimitDesc>(LimitDesc.class, LimitOperator.class)); - opvec.add(new OpTuple<TableScanDesc>(TableScanDesc.class, TableScanOperator.class)); - opvec.add(new OpTuple<UnionDesc>(UnionDesc.class, UnionOperator.class)); - opvec.add(new OpTuple<UDTFDesc>(UDTFDesc.class, UDTFOperator.class)); - opvec.add(new OpTuple<LateralViewJoinDesc>(LateralViewJoinDesc.class, - LateralViewJoinOperator.class)); - opvec.add(new OpTuple<LateralViewForwardDesc>(LateralViewForwardDesc.class, - LateralViewForwardOperator.class)); - opvec.add(new OpTuple<HashTableDummyDesc>(HashTableDummyDesc.class, - HashTableDummyOperator.class)); - opvec.add(new OpTuple<HashTableSinkDesc>(HashTableSinkDesc.class, - HashTableSinkOperator.class)); - opvec.add(new OpTuple<SparkHashTableSinkDesc>(SparkHashTableSinkDesc.class, - SparkHashTableSinkOperator.class)); - opvec.add(new OpTuple<DummyStoreDesc>(DummyStoreDesc.class, - DummyStoreOperator.class)); - opvec.add(new OpTuple<DemuxDesc>(DemuxDesc.class, - DemuxOperator.class)); - opvec.add(new OpTuple<MuxDesc>(MuxDesc.class, - MuxOperator.class)); - opvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class, - AppMasterEventOperator.class)); - opvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class, - AppMasterEventOperator.class)); - opvec.add(new OpTuple<SparkPartitionPruningSinkDesc>(SparkPartitionPruningSinkDesc.class, - SparkPartitionPruningSinkOperator.class)); - opvec.add(new OpTuple<RCFileMergeDesc>(RCFileMergeDesc.class, - RCFileMergeOperator.class)); - opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class, - OrcFileMergeOperator.class)); - opvec.add(new OpTuple<CommonMergeJoinDesc>(CommonMergeJoinDesc.class, - CommonMergeJoinOperator.class)); - opvec.add(new OpTuple<ListSinkDesc>(ListSinkDesc.class, - ListSinkOperator.class)); + opvec.put(FilterDesc.class, FilterOperator.class); + opvec.put(SelectDesc.class, SelectOperator.class); + opvec.put(ForwardDesc.class, ForwardOperator.class); + opvec.put(FileSinkDesc.class, FileSinkOperator.class); + opvec.put(CollectDesc.class, CollectOperator.class); + opvec.put(ScriptDesc.class, ScriptOperator.class); + opvec.put(PTFDesc.class, PTFOperator.class); + opvec.put(ReduceSinkDesc.class, ReduceSinkOperator.class); + opvec.put(GroupByDesc.class, GroupByOperator.class); + opvec.put(JoinDesc.class, JoinOperator.class); + opvec.put(MapJoinDesc.class, MapJoinOperator.class); + opvec.put(SMBJoinDesc.class, SMBMapJoinOperator.class); + opvec.put(LimitDesc.class, LimitOperator.class); + opvec.put(TableScanDesc.class, TableScanOperator.class); + opvec.put(UnionDesc.class, UnionOperator.class); + opvec.put(UDTFDesc.class, UDTFOperator.class); + opvec.put(LateralViewJoinDesc.class, LateralViewJoinOperator.class); + opvec.put(LateralViewForwardDesc.class, LateralViewForwardOperator.class); + opvec.put(HashTableDummyDesc.class, HashTableDummyOperator.class); + opvec.put(HashTableSinkDesc.class, HashTableSinkOperator.class); + opvec.put(SparkHashTableSinkDesc.class, SparkHashTableSinkOperator.class); + opvec.put(DummyStoreDesc.class, DummyStoreOperator.class); + opvec.put(DemuxDesc.class, DemuxOperator.class); + opvec.put(MuxDesc.class, MuxOperator.class); + opvec.put(AppMasterEventDesc.class, AppMasterEventOperator.class); + opvec.put(DynamicPruningEventDesc.class, AppMasterEventOperator.class); + opvec.put(SparkPartitionPruningSinkDesc.class, SparkPartitionPruningSinkOperator.class); + opvec.put(RCFileMergeDesc.class, RCFileMergeOperator.class); + opvec.put(OrcFileMergeDesc.class, OrcFileMergeOperator.class); + opvec.put(CommonMergeJoinDesc.class, CommonMergeJoinOperator.class); + opvec.put(ListSinkDesc.class, ListSinkOperator.class); } static { - vectorOpvec = new ArrayList<OpTuple>(); - vectorOpvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class, - VectorAppMasterEventOperator.class)); - vectorOpvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class, - VectorAppMasterEventOperator.class)); - vectorOpvec.add(new OpTuple<SparkPartitionPruningSinkDesc>( - SparkPartitionPruningSinkDesc.class, - VectorSparkPartitionPruningSinkOperator.class)); - vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class)); - vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class)); - vectorOpvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, VectorMapJoinOperator.class)); - vectorOpvec.add(new OpTuple<SMBJoinDesc>(SMBJoinDesc.class, VectorSMBMapJoinOperator.class)); - vectorOpvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class, - VectorReduceSinkOperator.class)); - vectorOpvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, VectorFileSinkOperator.class)); - vectorOpvec.add(new OpTuple<FilterDesc>(FilterDesc.class, VectorFilterOperator.class)); - vectorOpvec.add(new OpTuple<LimitDesc>(LimitDesc.class, VectorLimitOperator.class)); - vectorOpvec.add(new OpTuple<SparkHashTableSinkDesc>(SparkHashTableSinkDesc.class, - VectorSparkHashTableSinkOperator.class)); - } - - private static final class OpTuple<T extends OperatorDesc> { - private final Class<T> descClass; - private final Class<? extends Operator<?>> opClass; - - public OpTuple(Class<T> descClass, Class<? extends Operator<?>> opClass) { - this.descClass = descClass; - this.opClass = opClass; - } + vectorOpvec.put(AppMasterEventDesc.class, VectorAppMasterEventOperator.class); + vectorOpvec.put(DynamicPruningEventDesc.class, VectorAppMasterEventOperator.class); + vectorOpvec.put( + SparkPartitionPruningSinkDesc.class, VectorSparkPartitionPruningSinkOperator.class); + vectorOpvec.put(SelectDesc.class, VectorSelectOperator.class); + vectorOpvec.put(GroupByDesc.class, VectorGroupByOperator.class); + vectorOpvec.put(MapJoinDesc.class, VectorMapJoinOperator.class); + vectorOpvec.put(SMBJoinDesc.class, VectorSMBMapJoinOperator.class); + vectorOpvec.put(ReduceSinkDesc.class, VectorReduceSinkOperator.class); + vectorOpvec.put(FileSinkDesc.class, VectorFileSinkOperator.class); + vectorOpvec.put(FilterDesc.class, VectorFilterOperator.class); + vectorOpvec.put(LimitDesc.class, VectorLimitOperator.class); + vectorOpvec.put(SparkHashTableSinkDesc.class, VectorSparkHashTableSinkOperator.class); } public static <T extends OperatorDesc> Operator<T> getVectorOperator( - Class<? extends Operator<?>> opClass, T conf, VectorizationContext vContext) throws HiveException { + Class<? extends Operator<?>> opClass, CompilationOpContext cContext, T conf, + VectorizationContext vContext) throws HiveException { try { Operator<T> op = (Operator<T>) opClass.getDeclaredConstructor( - VectorizationContext.class, OperatorDesc.class).newInstance( - vContext, conf); + CompilationOpContext.class, VectorizationContext.class, OperatorDesc.class) + .newInstance(cContext, vContext, conf); return op; } catch (Exception e) { e.printStackTrace(); @@ -177,50 +152,49 @@ public final class OperatorFactory { } } - public static <T extends OperatorDesc> Operator<T> getVectorOperator(T conf, - VectorizationContext vContext) throws HiveException { + public static <T extends OperatorDesc> Operator<T> getVectorOperator( + CompilationOpContext cContext, T conf, VectorizationContext vContext) throws HiveException { Class<T> descClass = (Class<T>) conf.getClass(); - for (OpTuple o : vectorOpvec) { - if (o.descClass == descClass) { - return getVectorOperator(o.opClass, conf, vContext); - } + Class<?> opClass = vectorOpvec.get(descClass); + if (opClass != null) { + return getVectorOperator(vectorOpvec.get(descClass), cContext, conf, vContext); } - throw new HiveException("No vector operator for descriptor class " - + descClass.getName()); + throw new HiveException("No vector operator for descriptor class " + descClass.getName()); } - public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) { - - for (OpTuple o : opvec) { - if (o.descClass == opClass) { - try { - Operator<T> op = (Operator<T>) o.opClass.newInstance(); - return op; - } catch (Exception e) { - e.printStackTrace(); - throw new RuntimeException(e); - } + public static <T extends OperatorDesc> Operator<T> get( + CompilationOpContext cContext, Class<T> descClass) { + Preconditions.checkNotNull(cContext); + Class<?> opClass = opvec.get(descClass); + if (opClass != null) { + try { + return (Operator<T>)opClass.getDeclaredConstructor( + CompilationOpContext.class).newInstance(cContext); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); } } - throw new RuntimeException("No operator for descriptor class " - + opClass.getName()); + throw new RuntimeException("No operator for descriptor class " + descClass.getName()); } - public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass, - RowSchema rwsch) { - - Operator<T> ret = get(opClass); - ret.setSchema(rwsch); - return ret; + /** + * Returns an operator given the conf and a list of children operators. + */ + public static <T extends OperatorDesc> Operator<T> get(CompilationOpContext cContext, T conf) { + Operator<T> ret = get(cContext, (Class<T>) conf.getClass()); + ret.setConf(conf); + return (ret); } /** * Returns an operator given the conf and a list of children operators. */ public static <T extends OperatorDesc> Operator<T> get(T conf, - Operator<? extends OperatorDesc>... oplist) { - Operator<T> ret = get((Class<T>) conf.getClass()); + Operator<? extends OperatorDesc> oplist0, Operator<? extends OperatorDesc>... oplist) { + Operator<T> ret = get(oplist0.getCompilationOpContext(), (Class<T>) conf.getClass()); ret.setConf(conf); + makeChild(ret, oplist0); makeChild(ret, oplist); return (ret); } @@ -253,27 +227,28 @@ public final class OperatorFactory { /** * Returns an operator given the conf and a list of children operators. */ - public static <T extends OperatorDesc> Operator<T> get(T conf, - RowSchema rwsch, Operator... oplist) { - Operator<T> ret = get(conf, oplist); + public static <T extends OperatorDesc> Operator<T> get( + CompilationOpContext cContext, T conf, RowSchema rwsch) { + Operator<T> ret = get(cContext, conf); ret.setSchema(rwsch); return (ret); } + /** * Returns an operator given the conf and a list of parent operators. */ - public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf, - Operator... oplist) { - Operator<T> ret = get((Class<T>) conf.getClass()); + public static <T extends OperatorDesc> Operator<T> getAndMakeChild( + T conf, Operator oplist0, Operator... oplist) { + Operator<T> ret = get(oplist0.getCompilationOpContext(), (Class<T>) conf.getClass()); ret.setConf(conf); - if (oplist.length == 0) { - return (ret); - } // Add the new operator as child of each of the passed in operators + List<Operator> children = oplist0.getChildOperators(); + children.add(ret); + oplist0.setChildOperators(children); for (Operator op : oplist) { - List<Operator> children = op.getChildOperators(); + children = op.getChildOperators(); children.add(ret); op.setChildOperators(children); } @@ -281,6 +256,7 @@ public final class OperatorFactory { // add parents for the newly created operator List<Operator<? extends OperatorDesc>> parent = new ArrayList<Operator<? extends OperatorDesc>>(); + parent.add(oplist0); for (Operator op : oplist) { parent.add(op); } @@ -293,9 +269,9 @@ public final class OperatorFactory { /** * Returns an operator given the conf and a list of parent operators. */ - public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf, - List<Operator<? extends OperatorDesc>> oplist) { - Operator<T> ret = get((Class<T>) conf.getClass()); + public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext, + T conf, List<Operator<? extends OperatorDesc>> oplist) { + Operator<T> ret = get(cContext, (Class<T>) conf.getClass()); ret.setConf(conf); if (oplist.size() == 0) { return ret; @@ -322,9 +298,49 @@ public final class OperatorFactory { /** * Returns an operator given the conf and a list of parent operators. */ - public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf, - RowSchema rwsch, Operator... oplist) { - Operator<T> ret = getAndMakeChild(conf, oplist); + public static <T extends OperatorDesc> Operator<T> getAndMakeChild( + CompilationOpContext cContext, T conf, RowSchema rwsch) { + Operator<T> ret = get(cContext, (Class<T>) conf.getClass()); + ret.setConf(conf); + ret.setSchema(rwsch); + return ret; + } + + /** + * Returns an operator given the conf and a list of parent operators. + */ + public static <T extends OperatorDesc> Operator<T> getAndMakeChild( + CompilationOpContext ctx, T conf, RowSchema rwsch, Operator[] oplist) { + Operator<T> ret = get(ctx, (Class<T>) conf.getClass()); + ret.setConf(conf); + ret.setSchema(rwsch); + if (oplist.length == 0) return ret; + + // Add the new operator as child of each of the passed in operators + for (Operator op : oplist) { + List<Operator> children = op.getChildOperators(); + children.add(ret); + op.setChildOperators(children); + } + + // add parents for the newly created operator + List<Operator<? extends OperatorDesc>> parent = + new ArrayList<Operator<? extends OperatorDesc>>(); + for (Operator op : oplist) { + parent.add(op); + } + + ret.setParentOperators(parent); + + return (ret); + } + + /** + * Returns an operator given the conf and a list of parent operators. + */ + public static <T extends OperatorDesc> Operator<T> getAndMakeChild( + T conf, RowSchema rwsch, Operator oplist0, Operator... oplist) { + Operator<T> ret = getAndMakeChild(conf, oplist0, oplist); ret.setSchema(rwsch); return ret; } @@ -332,9 +348,9 @@ public final class OperatorFactory { /** * Returns an operator given the conf and a list of parent operators. */ - public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf, - RowSchema rwsch, Map<String, ExprNodeDesc> colExprMap, Operator... oplist) { - Operator<T> ret = getAndMakeChild(conf, rwsch, oplist); + public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf, RowSchema rwsch, + Map<String, ExprNodeDesc> colExprMap, Operator oplist0, Operator... oplist) { + Operator<T> ret = getAndMakeChild(conf, rwsch, oplist0, oplist); ret.setColumnExprMap(colExprMap); return (ret); } @@ -342,9 +358,9 @@ public final class OperatorFactory { /** * Returns an operator given the conf and a list of parent operators. */ - public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf, - RowSchema rwsch, List<Operator<? extends OperatorDesc>> oplist) { - Operator<T> ret = getAndMakeChild(conf, oplist); + public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext, + T conf, RowSchema rwsch, List<Operator<? extends OperatorDesc>> oplist) { + Operator<T> ret = getAndMakeChild(cContext, conf, oplist); ret.setSchema(rwsch); return (ret); } @@ -352,9 +368,10 @@ public final class OperatorFactory { /** * Returns an operator given the conf and a list of parent operators. */ - public static <T extends OperatorDesc> Operator<T> getAndMakeChild(T conf, - RowSchema rwsch, Map<String, ExprNodeDesc> colExprMap, List<Operator<? extends OperatorDesc>> oplist) { - Operator<T> ret = getAndMakeChild(conf, rwsch, oplist); + public static <T extends OperatorDesc> Operator<T> getAndMakeChild(CompilationOpContext cContext, + T conf, RowSchema rwsch, Map<String, ExprNodeDesc> colExprMap, + List<Operator<? extends OperatorDesc>> oplist) { + Operator<T> ret = getAndMakeChild(cContext, conf, rwsch, oplist); ret.setColumnExprMap(colExprMap); return (ret); } http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java index 2c9deac..445cf3d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec; import java.io.IOException; import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.io.orc.Writer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,15 @@ public class OrcFileMergeOperator extends private Reader reader; private FSDataInputStream fdis; + /** Kryo ctor. */ + protected OrcFileMergeOperator() { + super(); + } + + public OrcFileMergeOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override public void process(Object row, int tag) throws HiveException { Object[] keyValue = (Object[]) row; http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java index 113ac21..2e9e539 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java @@ -26,6 +26,7 @@ import java.util.Stack; import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.PTFPartition.PTFPartitionIterator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; @@ -60,6 +61,15 @@ public class PTFOperator extends Operator<PTFDesc> implements Serializable { transient Configuration hiveConf; transient PTFInvocation ptfInvocation; + /** Kryo ctor. */ + protected PTFOperator() { + super(); + } + + public PTFOperator(CompilationOpContext ctx) { + super(ctx); + } + /* * 1. Find out if the operator is invoked at Map-Side or Reduce-side * 2. Get the deserialized QueryDef http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java index c34454c..4dea1d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/RCFileMergeOperator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileKeyBufferWrapper; @@ -36,12 +37,22 @@ import java.io.IOException; */ public class RCFileMergeOperator extends AbstractFileMergeOperator<RCFileMergeDesc> { + public final static Logger LOG = LoggerFactory.getLogger("RCFileMergeMapper"); RCFile.Writer outWriter; CompressionCodec codec = null; int columnNumber = 0; + /** Kryo ctor. */ + protected RCFileMergeOperator() { + super(); + } + + public RCFileMergeOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override public void process(Object row, int tag) throws HiveException { Object[] keyValue = (Object[]) row; http://git-wip-us.apache.org/repos/asf/hive/blob/8271c63d/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 4b65952..74b4802 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -31,6 +31,7 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -151,6 +152,15 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc> protected transient long logEveryNRows = 0; private final transient LongWritable recordCounter = new LongWritable(); + /** Kryo ctor. */ + protected ReduceSinkOperator() { + super(); + } + + public ReduceSinkOperator(CompilationOpContext ctx) { + super(ctx); + } + @Override protected void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf);