http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java index e4107ff..8ed1ed4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterStringOperator.java @@ -23,6 +23,7 @@ import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -67,12 +68,18 @@ public class VectorMapJoinOuterStringOperator extends VectorMapJoinOuterGenerate // Pass-thru constructors. // - public VectorMapJoinOuterStringOperator() { + /** Kryo ctor. */ + protected VectorMapJoinOuterStringOperator() { super(); } - public VectorMapJoinOuterStringOperator(VectorizationContext vContext, OperatorDesc conf) throws HiveException { - super(vContext, conf); + public VectorMapJoinOuterStringOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinOuterStringOperator(CompilationOpContext ctx, + VectorizationContext vContext, OperatorDesc conf) throws HiveException { + super(ctx, vContext, conf); } //---------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java index a79a649..7bdd11a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkCommonOperator.java @@ -25,6 +25,7 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TerminalOperator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -143,13 +144,18 @@ public abstract class VectorReduceSinkCommonOperator extends TerminalOperator<Re //--------------------------------------------------------------------------- - public VectorReduceSinkCommonOperator() { + /** Kryo ctor. */ + protected VectorReduceSinkCommonOperator() { super(); } - public VectorReduceSinkCommonOperator(VectorizationContext vContext, OperatorDesc conf) - throws HiveException { - super(); + public VectorReduceSinkCommonOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorReduceSinkCommonOperator(CompilationOpContext ctx, + VectorizationContext vContext, OperatorDesc conf) throws HiveException { + this(ctx); ReduceSinkDesc desc = (ReduceSinkDesc) conf; this.conf = desc; http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java index cec5660..325f773 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkLongOperator.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.vector.reducesink; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesLongSerialized; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -49,13 +50,18 @@ public class VectorReduceSinkLongOperator extends VectorReduceSinkCommonOperator // Pass-thru constructors. // - public VectorReduceSinkLongOperator() { + /** Kryo ctor. */ + protected VectorReduceSinkLongOperator() { super(); } - public VectorReduceSinkLongOperator(VectorizationContext vContext, OperatorDesc conf) - throws HiveException { - super(vContext, conf); + public VectorReduceSinkLongOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorReduceSinkLongOperator(CompilationOpContext ctx, + VectorizationContext vContext, OperatorDesc conf) throws HiveException { + super(ctx, vContext, conf); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java index a4ef66b..2027187 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkMultiKeyOperator.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.vector.reducesink; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesMultiSerialized; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -45,13 +46,18 @@ public class VectorReduceSinkMultiKeyOperator extends VectorReduceSinkCommonOper // Pass-thru constructors. // - public VectorReduceSinkMultiKeyOperator() { + /** Kryo ctor. */ + protected VectorReduceSinkMultiKeyOperator() { super(); } - public VectorReduceSinkMultiKeyOperator(VectorizationContext vContext, OperatorDesc conf) - throws HiveException { - super(vContext, conf); + public VectorReduceSinkMultiKeyOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorReduceSinkMultiKeyOperator(CompilationOpContext ctx, + VectorizationContext vContext, OperatorDesc conf) throws HiveException { + super(ctx, vContext, conf); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java index b6cb527..b655e6e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/reducesink/VectorReduceSinkStringOperator.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.vector.reducesink; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.keyseries.VectorKeySeriesBytesSerialized; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -48,13 +49,18 @@ public class VectorReduceSinkStringOperator extends VectorReduceSinkCommonOperat // Pass-thru constructors. // - public VectorReduceSinkStringOperator() { + /** Kryo ctor. */ + protected VectorReduceSinkStringOperator() { super(); } - public VectorReduceSinkStringOperator(VectorizationContext vContext, OperatorDesc conf) - throws HiveException { - super(vContext, conf); + public VectorReduceSinkStringOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorReduceSinkStringOperator(CompilationOpContext ctx, + VectorizationContext vContext, OperatorDesc conf) throws HiveException { + super(ctx, vContext, conf); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index 38b6b5d..0be8b3c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -135,7 +135,7 @@ public class ATSHook implements ExecuteWithHookContext { ); @SuppressWarnings("unchecked") ExplainTask explain = (ExplainTask) TaskFactory.get(work, conf); - explain.initialize(conf, plan, null); + explain.initialize(conf, plan, null, null); String query = plan.getQueryStr(); JSONObject explainPlan = explain.getJSONPlan(null, work); String logID = conf.getLogIdVar(SessionState.get().getSessionId()); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index e23a969..82629c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -22,6 +22,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; @@ -60,8 +61,8 @@ public class MergeFileTask extends Task<MergeFileWork> implements Serializable, @Override public void initialize(HiveConf conf, QueryPlan queryPlan, - DriverContext driverContext) { - super.initialize(conf, queryPlan, driverContext); + DriverContext driverContext, CompilationOpContext opContext) { + super.initialize(conf, queryPlan, driverContext, opContext); job = new JobConf(conf, MergeFileTask.class); jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this); } http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java index 829a9f6..71371a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.StatsSetupConst.StatDB; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -84,8 +85,8 @@ public class PartialScanTask extends Task<PartialScanWork> implements @Override public void initialize(HiveConf conf, QueryPlan queryPlan, - DriverContext driverContext) { - super.initialize(conf, queryPlan, driverContext); + DriverContext driverContext, CompilationOpContext opContext) { + super.initialize(conf, queryPlan, driverContext, opContext); job = new JobConf(conf, PartialScanTask.class); jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this); } @@ -351,7 +352,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements PartialScanWork mergeWork = new PartialScanWork(inputPaths); DriverContext driverCxt = new DriverContext(); PartialScanTask taskExec = new PartialScanTask(); - taskExec.initialize(hiveConf, null, driverCxt); + taskExec.initialize(hiveConf, null, driverCxt, new CompilationOpContext()); taskExec.setWork(mergeWork); int ret = taskExec.execute(driverCxt); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index 34c067a..bc21da0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; @@ -59,8 +60,8 @@ public class ColumnTruncateTask extends Task<ColumnTruncateWork> implements Seri @Override public void initialize(HiveConf conf, QueryPlan queryPlan, - DriverContext driverContext) { - super.initialize(conf, queryPlan, driverContext); + DriverContext driverContext, CompilationOpContext opContext) { + super.initialize(conf, queryPlan, driverContext, opContext); job = new JobConf(conf, ColumnTruncateTask.class); jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this); } http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java index 1da0dda..b57dc77 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java @@ -185,7 +185,7 @@ abstract public class AbstractSMBJoinProc extends AbstractBucketJoinProc impleme par.getChildOperators().add(index, smbJop); } else { - DummyStoreOperator dummyStoreOp = new DummyStoreOperator(); + DummyStoreOperator dummyStoreOp = new DummyStoreOperator(par.getCompilationOpContext()); par.getChildOperators().add(index, dummyStoreOp); List<Operator<? extends OperatorDesc>> childrenOps = http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index ea89cf0..00bc193 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -245,8 +245,9 @@ public class ConvertJoinMapJoin implements NodeProcessor { } CommonMergeJoinOperator mergeJoinOp = - (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets, - mapJoinConversionPos, mapJoinDesc), joinOp.getSchema()); + (CommonMergeJoinOperator) OperatorFactory.get(joinOp.getCompilationOpContext(), + new CommonMergeJoinDesc(numBuckets, mapJoinConversionPos, mapJoinDesc), + joinOp.getSchema()); OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits() .getSortCols()); @@ -295,7 +296,8 @@ public class ConvertJoinMapJoin implements NodeProcessor { } // insert the dummy store operator here - DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator(); + DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator( + mergeJoinOp.getCompilationOpContext()); dummyStoreOp.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>()); dummyStoreOp.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>()); dummyStoreOp.getChildOperators().add(mergeJoinOp); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java index 8d232ac..60240bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java @@ -315,8 +315,8 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor { new ArrayList<AggregationDesc>(), false, groupByMemoryUsage, memoryThreshold, null, false, 0, true); - GroupByOperator groupByOp = - (GroupByOperator) OperatorFactory.getAndMakeChild(groupBy, selectOp); + GroupByOperator groupByOp = (GroupByOperator) OperatorFactory.getAndMakeChild( + groupBy, selectOp); Map<String, ExprNodeDesc> colMap = new HashMap<String, ExprNodeDesc>(); colMap.put(outputNames.get(0), groupByExpr); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java index 2160e01..9c979be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java @@ -204,7 +204,7 @@ public class GenMRTableScan1 implements NodeProcessor { // partial scan task DriverContext driverCxt = new DriverContext(); Task<PartialScanWork> psTask = TaskFactory.get(scanWork, parseCtx.getConf()); - psTask.initialize(parseCtx.getConf(), null, driverCxt); + psTask.initialize(parseCtx.getConf(), null, driverCxt, op.getCompilationOpContext()); psTask.setWork(scanWork); // task dependency http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 9cbd496..3500711 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.ColumnInfo; @@ -993,9 +994,10 @@ public final class GenMapRedUtils { return mrWork; } - public static TableScanOperator createTemporaryTableScanOperator(RowSchema rowSchema) { + public static TableScanOperator createTemporaryTableScanOperator( + CompilationOpContext ctx, RowSchema rowSchema) { TableScanOperator tableScanOp = - (TableScanOperator) OperatorFactory.get(new TableScanDesc(null), rowSchema); + (TableScanOperator) OperatorFactory.get(ctx, new TableScanDesc(null), rowSchema); // Set needed columns for this dummy TableScanOperator List<Integer> neededColumnIds = new ArrayList<Integer>(); List<String> neededColumnNames = new ArrayList<String>(); @@ -1038,7 +1040,7 @@ public final class GenMapRedUtils { HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE)); } Operator<? extends OperatorDesc> fileSinkOp = OperatorFactory.get( - desc, parent.getSchema()); + parent.getCompilationOpContext(), desc, parent.getSchema()); // Connect parent to fileSinkOp parent.replaceChild(child, fileSinkOp); @@ -1046,7 +1048,7 @@ public final class GenMapRedUtils { // Create a dummy TableScanOperator for the file generated through fileSinkOp TableScanOperator tableScanOp = createTemporaryTableScanOperator( - parent.getSchema()); + parent.getCompilationOpContext(), parent.getSchema()); // Connect this TableScanOperator to child. tableScanOp.setChildOperators(Utilities.makeList(child)); @@ -1272,8 +1274,8 @@ public final class GenMapRedUtils { // Create a TableScan operator RowSchema inputRS = fsInput.getSchema(); - TableScanOperator tsMerge = - GenMapRedUtils.createTemporaryTableScanOperator(inputRS); + TableScanOperator tsMerge = GenMapRedUtils.createTemporaryTableScanOperator( + fsInput.getCompilationOpContext(), inputRS); // Create a FileSink operator TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone(); @@ -1324,7 +1326,7 @@ public final class GenMapRedUtils { fsInputDesc.getTableInfo().getInputFileFormatClass().equals(OrcInputFormat.class))) { cplan = GenMapRedUtils.createMergeTask(fsInputDesc, finalName, - dpCtx != null && dpCtx.getNumDPCols() > 0); + dpCtx != null && dpCtx.getNumDPCols() > 0, fsInput.getCompilationOpContext()); if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { work = new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); cplan.setName("File Merge"); @@ -1560,12 +1562,13 @@ public final class GenMapRedUtils { * * @param fsInputDesc * @param finalName + * @param ctx * @param inputFormatClass * @return MergeWork if table is stored as RCFile or ORCFile, * null otherwise */ - public static MapWork createMergeTask(FileSinkDesc fsInputDesc, - Path finalName, boolean hasDynamicPartitions) throws SemanticException { + public static MapWork createMergeTask(FileSinkDesc fsInputDesc, Path finalName, + boolean hasDynamicPartitions, CompilationOpContext ctx) throws SemanticException { Path inputDir = fsInputDesc.getFinalDirName(); TableDesc tblDesc = fsInputDesc.getTableInfo(); @@ -1621,7 +1624,7 @@ public final class GenMapRedUtils { int lbLevel = work.getListBucketingCtx() == null ? 0 : work.getListBucketingCtx().calculateListBucketingLevel(); fmd.setListBucketingDepth(lbLevel); - mergeOp = OperatorFactory.get(fmd); + mergeOp = OperatorFactory.get(ctx, fmd); aliasToWork.put(inputDir.toString(), mergeOp); work.setAliasToWork(aliasToWork); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java index 8f3ce60..3233157 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java @@ -571,8 +571,8 @@ public class GroupByOptimizer extends Transform { colName.add(cInfo.getInternalName()); columnExprMap.put(cInfo.getInternalName(), column); } - return OperatorFactory.getAndMakeChild(new SelectDesc(columns, colName), new RowSchema(currOp - .getSchema().getSignature()), columnExprMap, parentOp); + return OperatorFactory.getAndMakeChild(new SelectDesc(columns, colName), + new RowSchema(currOp.getSchema().getSignature()), columnExprMap, parentOp); } } http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java index 2e3f930..e577e19 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java @@ -80,6 +80,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import com.clearspring.analytics.util.Lists; + /** * Implementation of one of the rule-based map join optimization. User passes hints to specify * map-joins and during this optimization, all user specified map joins are converted to MapJoins - @@ -376,7 +378,8 @@ public class MapJoinProcessor extends Transform { RowSchema outputRS = op.getSchema(); MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild( - mapJoinDescriptor, new RowSchema(outputRS.getSignature()), op.getParentOperators()); + op.getCompilationOpContext(), mapJoinDescriptor, + new RowSchema(outputRS.getSignature()), op.getParentOperators()); mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs()); Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap(); @@ -438,7 +441,8 @@ public class MapJoinProcessor extends Transform { RowSchema joinRS = smbJoinOp.getSchema(); // The mapjoin has the same schema as the join operator MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild( - mapJoinDesc, joinRS, new ArrayList<Operator<? extends OperatorDesc>>()); + smbJoinOp.getCompilationOpContext(), mapJoinDesc, joinRS, + new ArrayList<Operator<? extends OperatorDesc>>()); // change the children of the original join operator to point to the map // join operator @@ -601,8 +605,8 @@ public class MapJoinProcessor extends Transform { SelectDesc select = new SelectDesc(exprs, outputs, false); - SelectOperator sel = (SelectOperator) OperatorFactory.getAndMakeChild(select, - new RowSchema(outputRS), input); + SelectOperator sel = (SelectOperator) OperatorFactory.getAndMakeChild( + select, new RowSchema(outputRS), input); sel.setColumnExprMap(colExprMap); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java index d5c3a2d..c38c6d7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java @@ -297,6 +297,7 @@ public class ReduceSinkMapJoinProc implements NodeProcessor { LOG.debug("Cloning reduce sink for multi-child broadcast edge"); // we've already set this one up. Need to clone for the next work. r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( + parentRS.getCompilationOpContext(), (ReduceSinkDesc) parentRS.getConf().clone(), new RowSchema(parentRS.getSchema()), parentRS.getParentOperators()); @@ -334,7 +335,8 @@ public class ReduceSinkMapJoinProc implements NodeProcessor { // create an new operator: HashTableDummyOperator, which share the table desc HashTableDummyDesc desc = new HashTableDummyDesc(); @SuppressWarnings("unchecked") - HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc); + HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get( + parentRS.getCompilationOpContext(), desc); TableDesc tbl; // need to create the correct table descriptor for key/value http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java index f71fd3f..fbcb779 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchAggregation.java @@ -121,7 +121,8 @@ public class SimpleFetchAggregation extends Transform { // Create a file sink operator for this file name FileSinkDesc desc = new FileSinkDesc(fileName, tsDesc, false); - FileSinkOperator newFS = (FileSinkOperator) OperatorFactory.get(desc, parent.getSchema()); + FileSinkOperator newFS = (FileSinkOperator) OperatorFactory.get( + parent.getCompilationOpContext(), desc, parent.getSchema()); newFS.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>()); newFS.getParentOperators().add(parent); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java index 719dfff..b5ceb14 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java @@ -474,7 +474,8 @@ public class SimpleFetchOptimizer extends Transform { public static ListSinkOperator replaceFSwithLS(Operator<?> fileSink, String nullFormat) { ListSinkDesc desc = new ListSinkDesc(nullFormat); - ListSinkOperator sink = (ListSinkOperator) OperatorFactory.get(desc); + ListSinkOperator sink = (ListSinkOperator) OperatorFactory.get( + fileSink.getCompilationOpContext(), desc); sink.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>()); Operator<? extends OperatorDesc> parent = fileSink.getParentOperators().get(0); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java index 3f1b277..5e30910 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java @@ -219,7 +219,7 @@ public class SkewJoinOptimizer extends Transform { oplist.add(currOp); oplist.add(currOpClone); Operator<? extends OperatorDesc> unionOp = - OperatorFactory.getAndMakeChild( + OperatorFactory.getAndMakeChild(currOp.getCompilationOpContext(), new UnionDesc(), new RowSchema(currOp.getSchema().getSignature()), oplist); // Introduce a select after the union @@ -228,8 +228,7 @@ public class SkewJoinOptimizer extends Transform { unionList.add(unionOp); Operator<? extends OperatorDesc> selectUnionOp = - OperatorFactory.getAndMakeChild( - new SelectDesc(true), + OperatorFactory.getAndMakeChild(currOp.getCompilationOpContext(), new SelectDesc(true), new RowSchema(unionOp.getSchema().getSignature()), unionList); // add the finalOp after the union @@ -475,8 +474,7 @@ public class SkewJoinOptimizer extends Transform { Operator<FilterDesc> filter = OperatorFactory.getAndMakeChild( new FilterDesc(filterExpr, false), - new RowSchema(tableScanOp.getSchema().getSignature()), - tableScanOp); + new RowSchema(tableScanOp.getSchema().getSignature()), tableScanOp); OperatorFactory.makeChild(filter, currChild); } http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java index a6d809b..7fbf8cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveGBOpConvUtil.java @@ -685,9 +685,9 @@ public class HiveGBOpConvUtil { ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(PlanUtils .getReduceSinkDesc(reduceKeys, keyLength, reduceValues, gbInfo.distColIndices, - outputKeyColumnNames, outputValueColumnNames, true, -1, - getNumPartFieldsForMapSideRS(gbInfo), getParallelismForMapSideRS(gbInfo), - AcidUtils.Operation.NOT_ACID), new RowSchema(colInfoLst), mapGB); + outputKeyColumnNames, outputValueColumnNames, true, -1, getNumPartFieldsForMapSideRS( + gbInfo), getParallelismForMapSideRS(gbInfo), AcidUtils.Operation.NOT_ACID), + new RowSchema(colInfoLst), mapGB); rsOp.setColumnExprMap(colExprMap); @@ -945,8 +945,8 @@ public class HiveGBOpConvUtil { && !(gbInfo.gbPhysicalPipelineMode == HIVEGBPHYSICALMODE.MAP_SIDE_GB_SKEW_GBKEYS_OR_DIST_UDAF_PRESENT); Operator rsGBOp = OperatorFactory.getAndMakeChild(new GroupByDesc(gbMode, outputColNames, gbKeys, aggregations, gbInfo.groupByMemoryUsage, gbInfo.memoryThreshold, gbInfo.grpSets, - includeGrpSetInGBDesc, groupingSetsColPosition, - gbInfo.containsDistinctAggr), new RowSchema(colInfoLst), rs); + includeGrpSetInGBDesc, groupingSetsColPosition, gbInfo.containsDistinctAggr), + new RowSchema(colInfoLst), rs); rsGBOp.setColumnExprMap(colExprMap); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java index 00f1acb..c79b1be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java @@ -253,7 +253,8 @@ public class HiveOpConverter { tsd.setNeededColumns(neededColumnNames); // 2. Setup TableScan - TableScanOperator ts = (TableScanOperator) OperatorFactory.get(tsd, new RowSchema(colInfos)); + TableScanOperator ts = (TableScanOperator) OperatorFactory.get( + semanticAnalyzer.getOpContext(), tsd, new RowSchema(colInfos)); topOps.put(tableAlias, ts); @@ -488,8 +489,7 @@ public class HiveOpConverter { && semanticAnalyzer.getQB().getParseInfo() != null) this.semanticAnalyzer.getQB().getParseInfo().setOuterQueryLimit(limit); ArrayList<ColumnInfo> cinfoLst = createColInfos(resultOp); - resultOp = OperatorFactory.getAndMakeChild(limitDesc, - new RowSchema(cinfoLst), resultOp); + resultOp = OperatorFactory.getAndMakeChild(limitDesc, new RowSchema(cinfoLst), resultOp); if (LOG.isDebugEnabled()) { LOG.debug("Generated " + resultOp + " with row schema: [" + resultOp.getSchema() + "]"); @@ -516,8 +516,8 @@ public class HiveOpConverter { filterRel.getCluster().getTypeFactory())); FilterDesc filDesc = new FilterDesc(filCondExpr, false); ArrayList<ColumnInfo> cinfoLst = createColInfos(inputOpAf.inputs.get(0)); - FilterOperator filOp = (FilterOperator) OperatorFactory.getAndMakeChild(filDesc, new RowSchema( - cinfoLst), inputOpAf.inputs.get(0)); + FilterOperator filOp = (FilterOperator) OperatorFactory.getAndMakeChild(filDesc, + new RowSchema(cinfoLst), inputOpAf.inputs.get(0)); if (LOG.isDebugEnabled()) { LOG.debug("Generated " + filOp + " with row schema: [" + filOp.getSchema() + "]"); @@ -569,8 +569,8 @@ public class HiveOpConverter { children[i] = genInputSelectForUnion(op, cinfoLst); } } - Operator<? extends OperatorDesc> unionOp = OperatorFactory.getAndMakeChild(unionDesc, - new RowSchema(cinfoLst), children); + Operator<? extends OperatorDesc> unionOp = OperatorFactory.getAndMakeChild( + semanticAnalyzer.getOpContext(), unionDesc, new RowSchema(cinfoLst), children); if (LOG.isDebugEnabled()) { LOG.debug("Generated " + unionOp + " with row schema: [" + unionOp.getSchema() + "]"); @@ -662,8 +662,8 @@ public class HiveOpConverter { unparseTranslator); RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr(); - Operator<?> ptfOp = OperatorFactory.getAndMakeChild(ptfDesc, - new RowSchema(ptfOpRR.getColumnInfos()), selectOp); + Operator<?> ptfOp = OperatorFactory.getAndMakeChild( + ptfDesc, new RowSchema(ptfOpRR.getColumnInfos()), selectOp); if (LOG.isDebugEnabled()) { LOG.debug("Generated " + ptfOp + " with row schema: [" + ptfOp.getSchema() + "]"); @@ -725,8 +725,8 @@ public class HiveOpConverter { SelectDesc selectDesc = new SelectDesc(new ArrayList<ExprNodeDesc>(descriptors.values()), new ArrayList<String>(descriptors.keySet())); ArrayList<ColumnInfo> cinfoLst = createColInfosSubset(input, keepColNames); - SelectOperator selectOp = (SelectOperator) OperatorFactory.getAndMakeChild(selectDesc, - new RowSchema(cinfoLst), rsOp); + SelectOperator selectOp = (SelectOperator) OperatorFactory.getAndMakeChild( + selectDesc, new RowSchema(cinfoLst), rsOp); selectOp.setColumnExprMap(descriptors); if (LOG.isDebugEnabled()) { @@ -823,8 +823,8 @@ public class HiveOpConverter { partitionCols, order, numReducers, acidOperation); } - ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(rsDesc, - new RowSchema(outputColumns), input); + ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( + rsDesc, new RowSchema(outputColumns), input); List<String> keyColNames = rsDesc.getOutputKeyColumnNames(); for (int i = 0; i < keyColNames.size(); i++) { @@ -976,8 +976,8 @@ public class HiveOpConverter { desc.setReversedExprs(reversedExprs); desc.setFilterMap(filterMap); - JoinOperator joinOp = (JoinOperator) OperatorFactory.getAndMakeChild(desc, new RowSchema( - outputColumns), childOps); + JoinOperator joinOp = (JoinOperator) OperatorFactory.getAndMakeChild( + childOps[0].getCompilationOpContext(), desc, new RowSchema(outputColumns), childOps); joinOp.setColumnExprMap(colExprMap); joinOp.setPosToAliasMap(posToAliasMap); @@ -1241,8 +1241,8 @@ public class HiveOpConverter { columnExprMap.put(uInfo.getInternalName(), column); } if (needSelectOp) { - return OperatorFactory.getAndMakeChild(new SelectDesc(columns, colName), new RowSchema( - uColumnInfo), columnExprMap, origInputOp); + return OperatorFactory.getAndMakeChild(new SelectDesc( + columns, colName), new RowSchema(uColumnInfo), columnExprMap, origInputOp); } else { return origInputOp; } http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java index 315a650..c8aa48c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/QueryPlanTreeTransformation.java @@ -27,6 +27,7 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.DemuxOperator; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.Operator; @@ -123,7 +124,11 @@ public class QueryPlanTreeTransformation { Map<ReduceSinkOperator, Integer> bottomRSToNewTag = new HashMap<ReduceSinkOperator, Integer>(); int newTag = 0; + CompilationOpContext opCtx = null; for (ReduceSinkOperator rsop: bottomReduceSinkOperators) { + if (opCtx == null) { + opCtx = rsop.getCompilationOpContext(); + } rsop.getConf().setNumReducers(numReducers); bottomRSToNewTag.put(rsop, newTag); parentRSsOfDemux.add(rsop); @@ -150,7 +155,7 @@ public class QueryPlanTreeTransformation { childIndexToOriginalNumParents, keysSerializeInfos, valuessSerializeInfos); - Operator<? extends OperatorDesc> demuxOp = OperatorFactory.get(demuxDesc); + Operator<? extends OperatorDesc> demuxOp = OperatorFactory.get(opCtx, demuxDesc); demuxOp.setChildOperators(childrenOfDemux); demuxOp.setParentOperators(parentRSsOfDemux); for (Operator<? extends OperatorDesc> child: childrenOfDemux) { @@ -199,7 +204,7 @@ public class QueryPlanTreeTransformation { CorrelationUtilities.getSingleParent(childOP, true); parentsOfMux.add(parentOp); Operator<? extends OperatorDesc> mux = OperatorFactory.get( - new MuxDesc(parentsOfMux)); + childOP.getCompilationOpContext(), new MuxDesc(parentsOfMux)); mux.setChildOperators(Utilities.makeList(childOP)); mux.setParentOperators(parentsOfMux); childOP.setParentOperators(Utilities.makeList(mux)); @@ -229,7 +234,8 @@ public class QueryPlanTreeTransformation { } } MuxDesc muxDesc = new MuxDesc(siblingOPs); - Operator<? extends OperatorDesc> mux = OperatorFactory.get(muxDesc); + Operator<? extends OperatorDesc> mux = OperatorFactory.get( + rsop.getCompilationOpContext(), muxDesc); mux.setChildOperators(Utilities.makeList(childOP)); mux.setParentOperators(parentsOfMux); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java index e9ca5fa..9ea0857 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java @@ -177,7 +177,6 @@ public class CommonJoinTaskDispatcher extends AbstractJoinTaskDispatcher impleme MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext .getParseContext().getConf()); JoinOperator newJoinOp = getJoinOp(newTask); - // optimize this newWork given the big table position MapJoinProcessor.genMapJoinOpAndLocalWork(physicalContext.getParseContext().getConf(), newWork, newJoinOp, bigTablePosition); @@ -513,8 +512,7 @@ public class CommonJoinTaskDispatcher extends AbstractJoinTaskDispatcher impleme taskToAliases.put(newTask, aliases); } } catch (Exception e) { - e.printStackTrace(); - throw new SemanticException("Generate Map Join Task Error: " + e.getMessage()); + throw new SemanticException("Generate Map Join Task Error: " + e.getMessage(), e); } // insert current common join task to conditional task http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index a71c474..9fbbd4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -259,7 +259,8 @@ public final class GenMRSkewJoinProcessor { Operator<? extends OperatorDesc>[] parentOps = new TableScanOperator[tags.length]; for (int k = 0; k < tags.length; k++) { Operator<? extends OperatorDesc> ts = - GenMapRedUtils.createTemporaryTableScanOperator(rowSchemaList.get((byte)k)); + GenMapRedUtils.createTemporaryTableScanOperator( + joinOp.getCompilationOpContext(), rowSchemaList.get((byte)k)); ((TableScanOperator)ts).setTableDesc(tableDescList.get((byte)k)); parentOps[k] = ts; } @@ -310,8 +311,8 @@ public final class GenMRSkewJoinProcessor { newPlan.setMapRedLocalWork(localPlan); // construct a map join and set it as the child operator of tblScan_op - MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory - .getAndMakeChild(mapJoinDescriptor, (RowSchema) null, parentOps); + MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild( + joinOp.getCompilationOpContext(), mapJoinDescriptor, (RowSchema) null, parentOps); // change the children of the original join operator to point to the map // join operator List<Operator<? extends OperatorDesc>> childOps = cloneJoinOp http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java index 4805162..11ec07a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java @@ -232,8 +232,8 @@ public class GenSparkSkewJoinProcessor { // create N TableScans Operator<? extends OperatorDesc>[] parentOps = new TableScanOperator[tags.length]; for (int k = 0; k < tags.length; k++) { - Operator<? extends OperatorDesc> ts = - GenMapRedUtils.createTemporaryTableScanOperator(rowSchemaList.get((byte) k)); + Operator<? extends OperatorDesc> ts = GenMapRedUtils.createTemporaryTableScanOperator( + joinOp.getCompilationOpContext(), rowSchemaList.get((byte) k)); ((TableScanOperator) ts).setTableDesc(tableDescList.get((byte) k)); parentOps[k] = ts; } @@ -249,7 +249,7 @@ public class GenSparkSkewJoinProcessor { mapJoinDescriptor.setNullSafes(joinDescriptor.getNullSafes()); // temporarily, mark it as child of all the TS MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory - .getAndMakeChild(mapJoinDescriptor, null, parentOps); + .getAndMakeChild(joinOp.getCompilationOpContext(), mapJoinDescriptor, null, parentOps); // clone the original join operator, and replace it with the MJ // this makes sure MJ has the same downstream operator plan as the original join @@ -360,7 +360,8 @@ public class GenSparkSkewJoinProcessor { Preconditions.checkArgument(tableScan.getChildOperators().size() == 1 && tableScan.getChildOperators().get(0) instanceof MapJoinOperator); HashTableDummyDesc desc = new HashTableDummyDesc(); - HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc); + HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get( + tableScan.getCompilationOpContext(), desc); dummyOp.getConf().setTbl(tableScan.getTableDesc()); MapJoinOperator mapJoinOp = (MapJoinOperator) tableScan.getChildOperators().get(0); mapJoinOp.replaceParent(tableScan, dummyOp); @@ -373,8 +374,8 @@ public class GenSparkSkewJoinProcessor { // mapjoin should not be affected by join reordering mjDesc.resetOrder(); SparkHashTableSinkDesc hashTableSinkDesc = new SparkHashTableSinkDesc(mjDesc); - SparkHashTableSinkOperator hashTableSinkOp = - (SparkHashTableSinkOperator) OperatorFactory.get(hashTableSinkDesc); + SparkHashTableSinkOperator hashTableSinkOp = (SparkHashTableSinkOperator)OperatorFactory.get( + tableScan.getCompilationOpContext(), hashTableSinkDesc); int[] valueIndex = mjDesc.getValueIndex(tag); if (valueIndex != null) { List<ExprNodeDesc> newValues = new ArrayList<ExprNodeDesc>(); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java index df598e7..9ca815c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java @@ -141,7 +141,7 @@ public final class LocalMapJoinProcFactory { HashTableSinkDesc hashTableSinkDesc = new HashTableSinkDesc(mapJoinDesc); HashTableSinkOperator hashTableSinkOp = (HashTableSinkOperator) OperatorFactory - .get(hashTableSinkDesc); + .get(mapJoinOp.getCompilationOpContext(), hashTableSinkDesc); // get the last operator for processing big tables int bigTable = mapJoinDesc.getPosBigTable(); @@ -203,7 +203,8 @@ public final class LocalMapJoinProcFactory { // create new operator: HashTable DummyOperator, which share the table desc HashTableDummyDesc desc = new HashTableDummyDesc(); - HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc); + HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get( + parent.getCompilationOpContext(), desc); TableDesc tbl; if (parent.getSchema() == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 1629a5d..f462bd6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -1894,7 +1894,8 @@ public class Vectorizer implements PhysicalPlanResolver { break; } - vectorOp = OperatorFactory.getVectorOperator(opClass, op.getConf(), vContext); + vectorOp = OperatorFactory.getVectorOperator( + opClass, op.getCompilationOpContext(), op.getConf(), vContext); LOG.info("Vectorizer vectorizeOperator map join class " + vectorOp.getClass().getSimpleName()); boolean minMaxEnabled = HiveConf.getBoolVar(hiveConf, @@ -2030,7 +2031,8 @@ public class Vectorizer implements PhysicalPlanResolver { vectorDesc.setReduceSinkKeyType(reduceSinkKeyType); vectorDesc.setVectorReduceSinkInfo(vectorReduceSinkInfo); - vectorOp = OperatorFactory.getVectorOperator(opClass, op.getConf(), vContext); + vectorOp = OperatorFactory.getVectorOperator( + opClass, op.getCompilationOpContext(), op.getConf(), vContext); LOG.info("Vectorizer vectorizeOperator reduce sink class " + vectorOp.getClass().getSimpleName()); return vectorOp; @@ -2179,7 +2181,8 @@ public class Vectorizer implements PhysicalPlanResolver { opClass = VectorSMBMapJoinOperator.class; } - vectorOp = OperatorFactory.getVectorOperator(opClass, op.getConf(), vContext); + vectorOp = OperatorFactory.getVectorOperator( + opClass, op.getCompilationOpContext(), op.getConf(), vContext); } else { @@ -2200,7 +2203,8 @@ public class Vectorizer implements PhysicalPlanResolver { if (!specialize) { - vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext); + vectorOp = OperatorFactory.getVectorOperator( + op.getCompilationOpContext(), op.getConf(), vContext); } else { @@ -2217,7 +2221,8 @@ public class Vectorizer implements PhysicalPlanResolver { case EXTRACT: case EVENT: case HASHTABLESINK: - vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext); + vectorOp = OperatorFactory.getVectorOperator( + op.getCompilationOpContext(), op.getConf(), vContext); break; default: vectorOp = op; http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java index 5b3125b..f48fac1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java @@ -197,7 +197,8 @@ public class SparkReduceSinkMapJoinProc implements NodeProcessor { // create an new operator: HashTableDummyOperator, which share the table desc HashTableDummyDesc desc = new HashTableDummyDesc(); - HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc); + HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get( + mapJoinOp.getCompilationOpContext(), desc); TableDesc tbl; // need to create the correct table descriptor for key/value @@ -261,8 +262,8 @@ public class SparkReduceSinkMapJoinProc implements NodeProcessor { mjDesc.setHashTableMemoryUsage(hashtableMemoryUsage); SparkHashTableSinkDesc hashTableSinkDesc = new SparkHashTableSinkDesc(mjDesc); - SparkHashTableSinkOperator hashTableSinkOp = - (SparkHashTableSinkOperator) OperatorFactory.get(hashTableSinkDesc); + SparkHashTableSinkOperator hashTableSinkOp = (SparkHashTableSinkOperator) OperatorFactory.get( + mapJoinOp.getCompilationOpContext(), hashTableSinkDesc); byte tag = (byte) pos; int[] valueIndex = mjDesc.getValueIndex(tag); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java index 94947d6..2a7f3d4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java @@ -196,8 +196,7 @@ public final class UnionProcFactory { Map<String, ExprNodeDesc> origColExprMap = originalOp.getColumnExprMap(); Operator<? extends OperatorDesc> cloneOp = - OperatorFactory.getAndMakeChild( - cloneDesc, + OperatorFactory.getAndMakeChild(cloneDesc, origSchema == null ? null : new RowSchema(origSchema), origColExprMap == null ? null : new HashMap(origColExprMap), parents.get(p)); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index cead5ae..8a9411a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryProperties; @@ -90,6 +91,7 @@ public abstract class BaseSemanticAnalyzer { protected final Logger LOG; protected final LogHelper console; + protected CompilationOpContext cContext; protected Context ctx; protected HashMap<String, String> idToTableNameMap; protected QueryProperties queryProperties; http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java index 2d365a9..c1e9ec1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java @@ -83,7 +83,7 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer { FetchTask fetchTask = sem.getFetchTask(); if (fetchTask != null) { // Initialize fetch work such that operator tree will be constructed. - fetchTask.getWork().initializeForFetch(); + fetchTask.getWork().initializeForFetch(ctx.getOpContext()); } ParseContext pCtx = null; http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 1a49de1..46d279e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -290,9 +290,8 @@ public class GenTezWork implements NodeProcessor { LOG.debug("Cloning reduce sink for multi-child broadcast edge"); // we've already set this one up. Need to clone for the next work. r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( - (ReduceSinkDesc)r.getConf().clone(), - new RowSchema(r.getSchema()), - r.getParentOperators()); + r.getCompilationOpContext(), (ReduceSinkDesc)r.getConf().clone(), + new RowSchema(r.getSchema()), r.getParentOperators()); context.clonedReduceSinks.add(r); } r.getConf().setOutputName(work.getName()); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java index 6f9948e..93b7a66 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java @@ -190,7 +190,8 @@ public class ProcessAnalyzeTable implements NodeProcessor { // partial scan task DriverContext driverCxt = new DriverContext(); Task<PartialScanWork> partialScanTask = TaskFactory.get(scanWork, parseContext.getConf()); - partialScanTask.initialize(parseContext.getConf(), null, driverCxt); + partialScanTask.initialize(parseContext.getConf(), null, driverCxt, + tableScan.getCompilationOpContext()); partialScanTask.setWork(scanWork); statsWork.setSourceTask(partialScanTask); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index c38699d..a0251fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryProperties; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; @@ -418,6 +419,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { analyzeRewrite, tableDesc, queryProperties); } + public CompilationOpContext getOpContext() { + return ctx.getOpContext(); + } + @SuppressWarnings("nls") public void doPhase1QBExpr(ASTNode ast, QBExpr qbexpr, String id, String alias) throws SemanticException { @@ -3323,8 +3328,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { .getChild(inputRecordWriterNum)); Class<? extends RecordReader> errRecordReader = getDefaultRecordReader(); - Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( - new ScriptDesc( + Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(new ScriptDesc( fetchFilesNotInLocalFilesystem(stripQuotes(trfm.getChild(execPos).getText())), inInfo, inRecordWriter, outInfo, outRecordReader, errRecordReader, errInfo), new RowSchema(out_rwsch.getColumnInfos()), input), out_rwsch); @@ -5424,8 +5428,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { FilterDesc orFilterDesc = new FilterDesc(previous, false); orFilterDesc.setGenerated(true); - selectInput = putOpInsertMap(OperatorFactory.getAndMakeChild( - orFilterDesc, new RowSchema( + selectInput = putOpInsertMap(OperatorFactory.getAndMakeChild(orFilterDesc, new RowSchema( inputRR.getColumnInfos()), input), inputRR); } @@ -6636,8 +6639,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { fileSinkDesc.setStaticSpec(dpCtx.getSPPath()); } - Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(fileSinkDesc, - fsRS, input), inputRR); + Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( + fileSinkDesc, fsRS, input), inputRR); if (ltd != null && SessionState.get() != null) { SessionState.get().getLineageState() @@ -7382,7 +7385,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { desc.setReversedExprs(reversedExprs); desc.setFilterMap(join.getFilterMap()); - JoinOperator joinOp = (JoinOperator) OperatorFactory.getAndMakeChild(desc, + JoinOperator joinOp = (JoinOperator) OperatorFactory.getAndMakeChild(getOpContext(), desc, new RowSchema(outputRR.getColumnInfos()), rightOps); joinOp.setColumnExprMap(colExprMap); joinOp.setPosToAliasMap(posToAliasMap); @@ -7499,8 +7502,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { reduceKeys.size(), numReds, AcidUtils.Operation.NOT_ACID); ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap( - OperatorFactory.getAndMakeChild(rsDesc, new RowSchema(outputRR - .getColumnInfos()), child), outputRR); + OperatorFactory.getAndMakeChild(rsDesc, new RowSchema(outputRR.getColumnInfos()), + child), outputRR); List<String> keyColNames = rsDesc.getOutputKeyColumnNames(); for (int i = 0 ; i < keyColNames.size(); i++) { colExprMap.put(Utilities.ReduceField.KEY + "." + keyColNames.get(i), reduceKeys.get(i)); @@ -9090,7 +9093,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Create a new union operator Operator<? extends OperatorDesc> unionforward = OperatorFactory - .getAndMakeChild(new UnionDesc(), new RowSchema(unionoutRR + .getAndMakeChild(getOpContext(), new UnionDesc(), new RowSchema(unionoutRR .getColumnInfos())); // set union operator as child of each of leftOp and rightOp @@ -9332,7 +9335,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { nameToSplitSample.remove(alias_id); } - top = (TableScanOperator) putOpInsertMap(OperatorFactory.get(tsDesc, + top = (TableScanOperator) putOpInsertMap(OperatorFactory.get(getOpContext(), tsDesc, new RowSchema(rwsch.getColumnInfos())), rwsch); // Add this to the list of top operators - we always start from a table @@ -11988,8 +11991,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ptfDesc.setMapSide(true); input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc, - new RowSchema(ptfMapRR.getColumnInfos()), - input), ptfMapRR); + new RowSchema(ptfMapRR.getColumnInfos()), input), ptfMapRR); rr = opParseCtx.get(input).getRowResolver(); } @@ -12052,8 +12054,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { PTFDesc ptfDesc = translator.translate(wSpec, this, conf, rr, unparseTranslator); RowResolver ptfOpRR = ptfDesc.getFuncDef().getOutputShape().getRr(); input = putOpInsertMap(OperatorFactory.getAndMakeChild(ptfDesc, - new RowSchema(ptfOpRR.getColumnInfos()), - input), ptfOpRR); + new RowSchema(ptfOpRR.getColumnInfos()), input), ptfOpRR); input = genSelectAllDesc(input); rr = ptfOpRR; } http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java index ea5e414..afbeccb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java @@ -185,7 +185,8 @@ public class GenSparkWork implements NodeProcessor { LOG.debug("Cloning reduce sink for multi-child broadcast edge"); // we've already set this one up. Need to clone for the next work. r = (ReduceSinkOperator) OperatorFactory.getAndMakeChild( - (ReduceSinkDesc)r.getConf().clone(), r.getParentOperators()); + r.getCompilationOpContext(), (ReduceSinkDesc)r.getConf().clone(), + r.getParentOperators()); } r.getConf().setOutputName(work.getName()); } http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java index fa8a53a..3f31fb5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -53,6 +54,15 @@ public class SparkPartitionPruningSinkOperator extends Operator<SparkPartitionPr protected transient DataOutputBuffer buffer; protected static final Logger LOG = LoggerFactory.getLogger(SparkPartitionPruningSinkOperator.class); + /** Kryo ctor. */ + protected SparkPartitionPruningSinkOperator() { + super(); + } + + public SparkPartitionPruningSinkOperator(CompilationOpContext ctx) { + super(ctx); + } + @SuppressWarnings("deprecation") public void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java index e4e7c98..80ccb28 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java @@ -187,7 +187,8 @@ public class SparkProcessAnalyzeTable implements NodeProcessor { @SuppressWarnings("unchecked") Task<PartialScanWork> partialScanTask = TaskFactory.get(scanWork, parseContext.getConf()); - partialScanTask.initialize(parseContext.getConf(), null, driverCxt); + partialScanTask.initialize(parseContext.getConf(), null, driverCxt, + tableScan.getCompilationOpContext()); partialScanTask.setWork(scanWork); statsWork.setSourceTask(partialScanTask); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java index 2bce69e..ad34b98 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java @@ -109,6 +109,7 @@ public abstract class BaseWork extends AbstractOperatorDesc { public abstract void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap); public abstract Set<Operator<? extends OperatorDesc>> getAllRootOperators(); + public abstract Operator<? extends OperatorDesc> getAnyRootOperator(); public Set<Operator<?>> getAllOperators() { http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java index 1d2c24d..76811b1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsWork.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -72,8 +73,8 @@ public class ColumnStatsWork implements Serializable { return fWork.getSink(); } - public void initializeForFetch() { - fWork.initializeForFetch(); + public void initializeForFetch(CompilationOpContext ctx) { + fWork.initializeForFetch(ctx); } public int getLeastNumRows() { http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java index edd9cac..d68c64c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FetchWork.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.TreeMap; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.ListSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -100,10 +101,10 @@ public class FetchWork implements Serializable { this.limit = limit; } - public void initializeForFetch() { + public void initializeForFetch(CompilationOpContext ctx) { if (source == null) { ListSinkDesc desc = new ListSinkDesc(serializationNullFormat); - sink = (ListSinkOperator) OperatorFactory.get(desc); + sink = (ListSinkOperator) OperatorFactory.get(ctx, desc); source = sink; } } http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index ffaf129..3ef50fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -399,6 +399,11 @@ public class MapWork extends BaseWork { return opSet; } + @Override + public Operator<? extends OperatorDesc> getAnyRootOperator() { + return aliasToWork.isEmpty() ? null : aliasToWork.values().iterator().next(); + } + public void mergeAliasedInput(String alias, String pathDir, PartitionDesc partitionInfo) { ArrayList<String> aliases = pathToAliases.get(pathDir); if (aliases == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java index da1010b..aa7f6ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java @@ -82,4 +82,9 @@ public class MapredWork extends AbstractOperatorDesc { return ops; } -} + + public Operator<?> getAnyOperator() { + Operator<?> result = mapWork.getAnyRootOperator(); + if (result != null) return result; + return (reduceWork != null) ? reduceWork.getAnyRootOperator() : null; + }} http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java index b088326..a5527dc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeJoinWork.java @@ -52,6 +52,11 @@ public class MergeJoinWork extends BaseWork { } @Override + public Operator<?> getAnyRootOperator() { + return getMainWork().getAnyRootOperator(); + } + + @Override public void configureJobConf(JobConf job) { } http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index 0ac625f..1c31962 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -197,6 +198,11 @@ public class ReduceWork extends BaseWork { return opSet; } + @Override + public Operator<? extends OperatorDesc> getAnyRootOperator() { + return getReducer(); + } + /** * If the number of reducers is -1, the runtime will automatically figure it * out by input data size. http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java index 3fecff3..5e30ece 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java @@ -64,6 +64,11 @@ public class UnionWork extends BaseWork { return new HashSet<Operator<?>>(); } + @Override + public Operator<? extends OperatorDesc> getAnyRootOperator() { + return null; + } + public void addUnionOperators(Collection<UnionOperator> unions) { unionOperators.addAll(unions); } http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java index 4702f01..5390ba7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java @@ -885,8 +885,7 @@ public final class OpProcFactory { .getChildOperators(); op.setChildOperators(null); Operator<FilterDesc> output = OperatorFactory.getAndMakeChild( - new FilterDesc(condn, false), new RowSchema(inputRS.getSignature()), - op); + new FilterDesc(condn, false), new RowSchema(inputRS.getSignature()), op); output.setChildOperators(originalChilren); for (Operator<? extends OperatorDesc> ch : originalChilren) { List<Operator<? extends OperatorDesc>> parentOperators = ch http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java b/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java index b091ebc..8066292 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java @@ -106,8 +106,8 @@ public class PredicateTransitivePropagate extends Transform { // insert filter operator between target(child) and input(parent) private Operator<FilterDesc> createFilter(Operator<?> target, Operator<?> parent, RowSchema parentRS, ExprNodeDesc filterExpr) { - Operator<FilterDesc> filter = OperatorFactory.get(new FilterDesc(filterExpr, false), - new RowSchema(parentRS.getSignature())); + Operator<FilterDesc> filter = OperatorFactory.get(parent.getCompilationOpContext(), + new FilterDesc(filterExpr, false), new RowSchema(parentRS.getSignature())); filter.getParentOperators().add(parent); filter.getChildOperators().add(target); parent.replaceChild(target, filter); http://git-wip-us.apache.org/repos/asf/hive/blob/88fceaca/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java index 5d5f02d..71c7310 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ppd/SyntheticJoinPredicate.java @@ -107,8 +107,8 @@ public class SyntheticJoinPredicate extends Transform { RowSchema parentRS, ExprNodeDesc filterExpr) { FilterDesc filterDesc = new FilterDesc(filterExpr, false); filterDesc.setSyntheticJoinPredicate(true); - Operator<FilterDesc> filter = OperatorFactory.get(filterDesc, - new RowSchema(parentRS.getSignature())); + Operator<FilterDesc> filter = OperatorFactory.get(parent.getCompilationOpContext(), + filterDesc, new RowSchema(parentRS.getSignature())); filter.getParentOperators().add(parent); filter.getChildOperators().add(target); parent.replaceChild(target, filter);