http://git-wip-us.apache.org/repos/asf/hive/blob/16d28b34/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 3a179a3..6167f48 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.optimizer.physical; import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM; import java.io.Serializable; +import java.lang.annotation.Annotation; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -33,6 +34,7 @@ import java.util.Properties; import java.util.Set; import java.util.Stack; import java.util.regex.Pattern; +import org.apache.commons.lang.ArrayUtils; import org.apache.calcite.util.Pair; import org.apache.commons.lang.ArrayUtils; @@ -43,6 +45,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.*; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; @@ -62,7 +66,11 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOpe import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkLongOperator; import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkMultiKeyOperator; import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkStringOperator; +import org.apache.hadoop.hive.ql.exec.vector.udf.VectorUDFAdaptor; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSourceMapping; +import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator; @@ -73,6 +81,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; @@ -91,18 +100,36 @@ import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; import org.apache.hadoop.hive.ql.plan.AggregationDesc; +import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.Explain; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.FilterDesc; import org.apache.hadoop.hive.ql.plan.GroupByDesc; +import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.LimitDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.ql.plan.VectorAppMasterEventDesc; +import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorFilterDesc; +import org.apache.hadoop.hive.ql.plan.VectorTableScanDesc; +import org.apache.hadoop.hive.ql.plan.VectorizationCondition; import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc.ProcessingMode; +import org.apache.hadoop.hive.ql.plan.VectorSparkHashTableSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorLimitDesc; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo; import org.apache.hadoop.hive.ql.plan.VectorPartitionConversion; +import org.apache.hadoop.hive.ql.plan.VectorSMBJoinDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; @@ -117,10 +144,13 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.OperatorVariation; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorDeserializeType; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; +import org.apache.hadoop.hive.ql.plan.VectorSelectDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.UDFAcos; import org.apache.hadoop.hive.ql.udf.UDFAsin; @@ -170,6 +200,9 @@ import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.NullStructSerDe; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; @@ -182,6 +215,9 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hive.common.util.AnnotationUtils; +import org.apache.hive.common.util.HiveStringUtils; +import org.apache.hive.common.util.ReflectionUtil; import com.google.common.base.Preconditions; @@ -234,12 +270,39 @@ public class Vectorizer implements PhysicalPlanResolver { private boolean isSpark; - boolean useVectorizedInputFileFormat; - boolean useVectorDeserialize; - boolean useRowDeserialize; + private boolean useVectorizedInputFileFormat; + private boolean useVectorDeserialize; + private boolean useRowDeserialize; + private boolean isReduceVectorizationEnabled; boolean isSchemaEvolution; + private BaseWork currentBaseWork; + private Operator<? extends OperatorDesc> currentOperator; + + public void testSetCurrentBaseWork(BaseWork testBaseWork) { + currentBaseWork = testBaseWork; + } + + private void setNodeIssue(String issue) { + currentBaseWork.setNotVectorizedReason( + VectorizerReason.createNodeIssue(issue)); + } + + private void setOperatorIssue(String issue) { + currentBaseWork.setNotVectorizedReason( + VectorizerReason.createOperatorIssue(currentOperator, issue)); + } + + private void setExpressionIssue(String expressionTitle, String issue) { + currentBaseWork.setNotVectorizedReason( + VectorizerReason.createExpressionIssue(currentOperator, expressionTitle, issue)); + } + + private void clearNotVectorizedReason() { + currentBaseWork.setNotVectorizedReason(null); + } + public Vectorizer() { supportedGenericUDFs.add(GenericUDFOPPlus.class); @@ -369,6 +432,10 @@ public class Vectorizer implements PhysicalPlanResolver { int partitionColumnCount; boolean useVectorizedInputFileFormat; + boolean groupByVectorOutput; + boolean allNative; + boolean usesVectorUDFAdaptor; + String[] scratchTypeNameArray; Set<Operator<? extends OperatorDesc>> nonVectorizedOps; @@ -379,6 +446,12 @@ public class Vectorizer implements PhysicalPlanResolver { partitionColumnCount = 0; } + public void assume() { + groupByVectorOutput = true; + allNative = true; + usesVectorUDFAdaptor = false; + } + public void setAllColumnNames(List<String> allColumnNames) { this.allColumnNames = allColumnNames; } @@ -394,9 +467,19 @@ public class Vectorizer implements PhysicalPlanResolver { public void setScratchTypeNameArray(String[] scratchTypeNameArray) { this.scratchTypeNameArray = scratchTypeNameArray; } + public void setGroupByVectorOutput(boolean groupByVectorOutput) { + this.groupByVectorOutput = groupByVectorOutput; + } + public void setAllNative(boolean allNative) { + this.allNative = allNative; + } + public void setUsesVectorUDFAdaptor(boolean usesVectorUDFAdaptor) { + this.usesVectorUDFAdaptor = usesVectorUDFAdaptor; + } public void setUseVectorizedInputFileFormat(boolean useVectorizedInputFileFormat) { this.useVectorizedInputFileFormat = useVectorizedInputFileFormat; } + public void setNonVectorizedOps(Set<Operator<? extends OperatorDesc>> nonVectorizedOps) { this.nonVectorizedOps = nonVectorizedOps; } @@ -428,7 +511,14 @@ public class Vectorizer implements PhysicalPlanResolver { scratchTypeNameArray); baseWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx); - baseWork.setUseVectorizedInputFileFormat(useVectorizedInputFileFormat); + if (baseWork instanceof MapWork) { + MapWork mapWork = (MapWork) baseWork; + mapWork.setUseVectorizedInputFileFormat(useVectorizedInputFileFormat); + } + + baseWork.setAllNative(allNative); + baseWork.setGroupByVectorOutput(groupByVectorOutput); + baseWork.setUsesVectorUDFAdaptor(usesVectorUDFAdaptor); } } @@ -445,17 +535,29 @@ public class Vectorizer implements PhysicalPlanResolver { throws SemanticException { Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd; if (currTask instanceof MapRedTask) { - convertMapWork(((MapRedTask) currTask).getWork().getMapWork(), false); + MapredWork mapredWork = ((MapRedTask) currTask).getWork(); + convertMapWork(mapredWork.getMapWork(), false); + ReduceWork reduceWork = mapredWork.getReduceWork(); + if (reduceWork != null) { + // Always set the EXPLAIN conditions. + setReduceWorkExplainConditions(reduceWork); + + // We do not vectorize MR Reduce. + } } else if (currTask instanceof TezTask) { TezWork work = ((TezTask) currTask).getWork(); - for (BaseWork w: work.getAllWork()) { - if (w instanceof MapWork) { - convertMapWork((MapWork) w, true); - } else if (w instanceof ReduceWork) { - // We are only vectorizing Reduce under Tez. - if (HiveConf.getBoolVar(hiveConf, - HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) { - convertReduceWork((ReduceWork) w, true); + for (BaseWork baseWork: work.getAllWork()) { + if (baseWork instanceof MapWork) { + convertMapWork((MapWork) baseWork, true); + } else if (baseWork instanceof ReduceWork) { + ReduceWork reduceWork = (ReduceWork) baseWork; + + // Always set the EXPLAIN conditions. + setReduceWorkExplainConditions(reduceWork); + + // We are only vectorizing Reduce under Tez/Spark. + if (isReduceVectorizationEnabled) { + convertReduceWork(reduceWork); } } } @@ -463,22 +565,43 @@ public class Vectorizer implements PhysicalPlanResolver { SparkWork sparkWork = (SparkWork) currTask.getWork(); for (BaseWork baseWork : sparkWork.getAllWork()) { if (baseWork instanceof MapWork) { - convertMapWork((MapWork) baseWork, false); - } else if (baseWork instanceof ReduceWork - && HiveConf.getBoolVar(hiveConf, - HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) { - convertReduceWork((ReduceWork) baseWork, false); + convertMapWork((MapWork) baseWork, true); + } else if (baseWork instanceof ReduceWork) { + ReduceWork reduceWork = (ReduceWork) baseWork; + + // Always set the EXPLAIN conditions. + setReduceWorkExplainConditions(reduceWork); + + if (isReduceVectorizationEnabled) { + convertReduceWork(reduceWork); + } } } } + return null; } - private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException { + private void convertMapWork(MapWork mapWork, boolean isTezOrSpark) throws SemanticException { + + mapWork.setVectorizationExamined(true); + + // Global used when setting errors, etc. + currentBaseWork = mapWork; + VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); - boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTez); + vectorTaskColumnInfo.assume(); + + boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark); if (ret) { - vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTez); + vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTezOrSpark); + } else if (currentBaseWork.getVectorizationEnabled()) { + VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason(); + if (notVectorizedReason == null) { + LOG.info("Cannot vectorize: unknown"); + } else { + LOG.info("Cannot vectorize: " + notVectorizedReason.toString()); + } } } @@ -499,6 +622,7 @@ public class Vectorizer implements PhysicalPlanResolver { LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork(); if ((aliasToWork == null) || (aliasToWork.size() == 0)) { + setNodeIssue("Vectorized map work requires work"); return null; } int tableScanCount = 0; @@ -507,7 +631,7 @@ public class Vectorizer implements PhysicalPlanResolver { for (Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) { Operator<?> op = entry.getValue(); if (op == null) { - LOG.warn("Map work has invalid aliases to work with. Fail validation!"); + setNodeIssue("Vectorized map work requires a valid alias"); return null; } if (op instanceof TableScanOperator) { @@ -517,7 +641,7 @@ public class Vectorizer implements PhysicalPlanResolver { } } if (tableScanCount > 1) { - LOG.warn("Map work has more than 1 TableScanOperator. Fail validation!"); + setNodeIssue("Vectorized map work only works with 1 TableScanOperator"); return null; } return new ImmutablePair(alias, tableScanOperator); @@ -558,22 +682,6 @@ public class Vectorizer implements PhysicalPlanResolver { } } - private String getHiveOptionsString() { - StringBuilder sb = new StringBuilder(); - sb.append(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname); - sb.append("="); - sb.append(useVectorizedInputFileFormat); - sb.append(", "); - sb.append(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname); - sb.append("="); - sb.append(useVectorDeserialize); - sb.append(", and "); - sb.append(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname); - sb.append("="); - sb.append(useRowDeserialize); - return sb.toString(); - } - /* * There are 3 modes of reading for vectorization: * @@ -588,44 +696,58 @@ public class Vectorizer implements PhysicalPlanResolver { * the row object into the VectorizedRowBatch with VectorAssignRow. * This picks up Input File Format not supported by the other two. */ - private boolean verifyAndSetVectorPartDesc(PartitionDesc pd, boolean isAcidTable) { + private boolean verifyAndSetVectorPartDesc(PartitionDesc pd, boolean isAcidTable, + HashSet<String> inputFileFormatClassNameSet, HashSet<String> enabledConditionsMetSet, + ArrayList<String> enabledConditionsNotMetList) { String inputFileFormatClassName = pd.getInputFileFormatClassName(); + // Always collect input file formats. + inputFileFormatClassNameSet.add(inputFileFormatClassName); + + boolean isInputFileFormatVectorized = Utilities.isInputFileFormatVectorized(pd); + + if (isAcidTable) { + + // Today, ACID tables are only ORC and that format is vectorizable. Verify these + // assumptions. + Preconditions.checkState(isInputFileFormatVectorized); + Preconditions.checkState(inputFileFormatClassName.equals(OrcInputFormat.class.getName())); + + if (!useVectorizedInputFileFormat) { + enabledConditionsNotMetList.add( + "Vectorizing ACID tables requires " + HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname); + return false; + } + + pd.setVectorPartitionDesc( + VectorPartitionDesc.createVectorizedInputFileFormat( + inputFileFormatClassName, Utilities.isInputFileFormatSelfDescribing(pd))); + + enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname); + return true; + } + // Look for Pass-Thru case where InputFileFormat has VectorizedInputFormatInterface // and reads VectorizedRowBatch as a "row". - if (isAcidTable || useVectorizedInputFileFormat) { + if (useVectorizedInputFileFormat) { - if (Utilities.isInputFileFormatVectorized(pd)) { - - if (!useVectorizedInputFileFormat) { - LOG.info("ACID tables con only be vectorized for the input file format -- " + - "i.e. when Hive Configuration option " + - HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname + - "=true"); - return false; - } + if (isInputFileFormatVectorized) { pd.setVectorPartitionDesc( VectorPartitionDesc.createVectorizedInputFileFormat( inputFileFormatClassName, Utilities.isInputFileFormatSelfDescribing(pd))); + enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname); return true; } - - // Today, ACID tables are only ORC and that format is vectorizable. Verify this - // assumption. - Preconditions.checkState(!isAcidTable); + // Fall through and look for other options... } - if (!(isSchemaEvolution || isAcidTable) && - (useVectorDeserialize || useRowDeserialize)) { - LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" + - " when both " + HiveConf.ConfVars.HIVE_SCHEMA_EVOLUTION.varname + "=false and " + - " ACID table is " + isAcidTable + " and " + - " given the Hive Configuration options " + getHiveOptionsString()); - return false; + if (!isSchemaEvolution) { + enabledConditionsNotMetList.add( + "Vectorizing tables without Schema Evolution requires " + HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname); } String deserializerClassName = pd.getDeserializerClassName(); @@ -635,6 +757,12 @@ public class Vectorizer implements PhysicalPlanResolver { // // Do the "vectorized" row-by-row deserialization into a VectorizedRowBatch in the // VectorMapOperator. + boolean isTextFormat = inputFileFormatClassName.equals(TextInputFormat.class.getName()) && + deserializerClassName.equals(LazySimpleSerDe.class.getName()); + boolean isSequenceFormat = + inputFileFormatClassName.equals(SequenceFileInputFormat.class.getName()) && + deserializerClassName.equals(LazyBinarySerDe.class.getName()); + boolean isVectorDeserializeEligable = isTextFormat || isSequenceFormat; if (useVectorDeserialize) { @@ -648,8 +776,7 @@ public class Vectorizer implements PhysicalPlanResolver { // org.apache.hadoop.mapred.SequenceFileInputFormat // org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe - if (inputFileFormatClassName.equals(TextInputFormat.class.getName()) && - deserializerClassName.equals(LazySimpleSerDe.class.getName())) { + if (isTextFormat) { Properties properties = pd.getTableDesc().getProperties(); String lastColumnTakesRestString = @@ -659,10 +786,11 @@ public class Vectorizer implements PhysicalPlanResolver { lastColumnTakesRestString.equalsIgnoreCase("true")); if (lastColumnTakesRest) { - // If row mode will not catch this, then inform. + // If row mode will not catch this input file format, then not enabled. if (useRowDeserialize) { - LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" + - " when " + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST + "is true"); + enabledConditionsNotMetList.add( + inputFileFormatClassName + " " + + serdeConstants.SERIALIZATION_LAST_COLUMN_TAKES_REST + " must be disabled "); return false; } } else { @@ -670,17 +798,19 @@ public class Vectorizer implements PhysicalPlanResolver { VectorPartitionDesc.createVectorDeserialize( inputFileFormatClassName, VectorDeserializeType.LAZY_SIMPLE)); + enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname); return true; } - } else if (inputFileFormatClassName.equals(SequenceFileInputFormat.class.getName()) && - deserializerClassName.equals(LazyBinarySerDe.class.getName())) { + } else if (isSequenceFormat) { pd.setVectorPartitionDesc( VectorPartitionDesc.createVectorDeserialize( inputFileFormatClassName, VectorDeserializeType.LAZY_BINARY)); + enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname); return true; } + // Fall through and look for other options... } // Otherwise, if enabled, deserialize rows using regular Serde and add the object @@ -694,17 +824,29 @@ public class Vectorizer implements PhysicalPlanResolver { Utilities.isInputFileFormatSelfDescribing(pd), deserializerClassName)); + enabledConditionsMetSet.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname); return true; } - LOG.info("Input format: " + inputFileFormatClassName + " cannot be vectorized" + - " given the Hive Configuration options " + getHiveOptionsString()); - + if (isInputFileFormatVectorized) { + Preconditions.checkState(!useVectorizedInputFileFormat); + enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTORIZED_INPUT_FILE_FORMAT.varname); + } else { + // Only offer these when the input file format is not the fast vectorized formats. + if (isVectorDeserializeEligable) { + Preconditions.checkState(!useVectorDeserialize); + enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_VECTOR_DESERIALIZE.varname); + } else { + // Since row mode takes everyone. + enabledConditionsNotMetList.add(HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE.varname); + } + } + return false; } - private boolean validateInputFormatAndSchemaEvolution(MapWork mapWork, String alias, + private ImmutablePair<Boolean, Boolean> validateInputFormatAndSchemaEvolution(MapWork mapWork, String alias, TableScanOperator tableScanOperator, VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException { @@ -732,27 +874,39 @@ public class Vectorizer implements PhysicalPlanResolver { LinkedHashMap<Path, ArrayList<String>> pathToAliases = mapWork.getPathToAliases(); LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo(); + + // Remember the input file formats we validated and why. + HashSet<String> inputFileFormatClassNameSet = new HashSet<String>(); + HashSet<String> enabledConditionsMetSet = new HashSet<String>(); + ArrayList<String> enabledConditionsNotMetList = new ArrayList<String>(); + for (Entry<Path, ArrayList<String>> entry: pathToAliases.entrySet()) { Path path = entry.getKey(); List<String> aliases = entry.getValue(); boolean isPresent = (aliases != null && aliases.indexOf(alias) != -1); if (!isPresent) { - LOG.info("Alias " + alias + " not present in aliases " + aliases); - return false; + setOperatorIssue("Alias " + alias + " not present in aliases " + aliases); + return new ImmutablePair<Boolean,Boolean>(false, false); } PartitionDesc partDesc = pathToPartitionInfo.get(path); if (partDesc.getVectorPartitionDesc() != null) { // We seen this already. continue; } - if (!verifyAndSetVectorPartDesc(partDesc, isAcidTable)) { - return false; + if (!verifyAndSetVectorPartDesc(partDesc, isAcidTable, inputFileFormatClassNameSet, + enabledConditionsMetSet, enabledConditionsNotMetList)) { + + // Always set these so EXPLAIN can see. + mapWork.setVectorizationInputFileFormatClassNameSet(inputFileFormatClassNameSet); + mapWork.setVectorizationEnabledConditionsMet(new ArrayList(enabledConditionsMetSet)); + mapWork.setVectorizationEnabledConditionsNotMet(enabledConditionsNotMetList); + + // We consider this an enable issue, not a not vectorized issue. + LOG.info("Cannot enable vectorization because input file format(s) " + inputFileFormatClassNameSet + + " do not met conditions " + VectorizationCondition.addBooleans(enabledConditionsNotMetList, false)); + return new ImmutablePair<Boolean,Boolean>(false, true); } VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc(); - if (LOG.isInfoEnabled()) { - LOG.info("Vectorizer path: " + path + ", " + vectorPartDesc.toString() + - ", aliases " + aliases); - } if (isFirst) { @@ -796,13 +950,13 @@ public class Vectorizer implements PhysicalPlanResolver { * implicitly defaulted to null. */ if (nextDataColumnList.size() > tableDataColumnList.size()) { - LOG.info( + setOperatorIssue( String.format( "Could not vectorize partition %s " + "(deserializer " + deserializer.getClass().getName() + ")" + "The partition column names %d is greater than the number of table columns %d", path, nextDataColumnList.size(), tableDataColumnList.size())); - return false; + return new ImmutablePair<Boolean,Boolean>(false, false); } if (!(deserializer instanceof NullStructSerDe)) { @@ -811,13 +965,13 @@ public class Vectorizer implements PhysicalPlanResolver { String nextColumnName = nextDataColumnList.get(i); String tableColumnName = tableDataColumnList.get(i); if (!nextColumnName.equals(tableColumnName)) { - LOG.info( + setOperatorIssue( String.format( "Could not vectorize partition %s " + "(deserializer " + deserializer.getClass().getName() + ")" + "The partition column name %s is does not match table column name %s", path, nextColumnName, tableColumnName)); - return false; + return new ImmutablePair<Boolean,Boolean>(false, false); } } } @@ -852,29 +1006,50 @@ public class Vectorizer implements PhysicalPlanResolver { // Helps to keep this for debugging. vectorTaskColumnInfo.setTableScanOperator(tableScanOperator); - return true; + // Always set these so EXPLAIN can see. + mapWork.setVectorizationInputFileFormatClassNameSet(inputFileFormatClassNameSet); + mapWork.setVectorizationEnabledConditionsMet(new ArrayList(enabledConditionsMetSet)); + mapWork.setVectorizationEnabledConditionsNotMet(enabledConditionsNotMetList); + + return new ImmutablePair<Boolean,Boolean>(true, false); } - private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) + private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTezOrSpark) throws SemanticException { LOG.info("Validating MapWork..."); - ImmutablePair<String,TableScanOperator> pair = verifyOnlyOneTableScanOperator(mapWork); - if (pair == null) { + ImmutablePair<String,TableScanOperator> onlyOneTableScanPair = verifyOnlyOneTableScanOperator(mapWork); + if (onlyOneTableScanPair == null) { + VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason(); + Preconditions.checkState(notVectorizedReason != null); + mapWork.setVectorizationEnabledConditionsNotMet(Arrays.asList(new String[] {notVectorizedReason.toString()})); return false; } - String alias = pair.left; - TableScanOperator tableScanOperator = pair.right; + String alias = onlyOneTableScanPair.left; + TableScanOperator tableScanOperator = onlyOneTableScanPair.right; // This call fills in the column names, types, and partition column count in // vectorTaskColumnInfo. - if (!validateInputFormatAndSchemaEvolution(mapWork, alias, tableScanOperator, vectorTaskColumnInfo)) { + currentOperator = tableScanOperator; + ImmutablePair<Boolean, Boolean> validateInputFormatAndSchemaEvolutionPair = + validateInputFormatAndSchemaEvolution(mapWork, alias, tableScanOperator, vectorTaskColumnInfo); + if (!validateInputFormatAndSchemaEvolutionPair.left) { + // Have we already set the enabled conditions not met? + if (!validateInputFormatAndSchemaEvolutionPair.right) { + VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason(); + Preconditions.checkState(notVectorizedReason != null); + mapWork.setVectorizationEnabledConditionsNotMet(Arrays.asList(new String[] {notVectorizedReason.toString()})); + } return false; } + // Now we are enabled and any issues found from here on out are considered + // not vectorized issues. + mapWork.setVectorizationEnabled(true); + Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); - MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTez); + MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTezOrSpark); addMapWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new DefaultGraphWalker(disp); @@ -896,13 +1071,13 @@ public class Vectorizer implements PhysicalPlanResolver { } private void vectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, - boolean isTez) throws SemanticException { + boolean isTezOrSpark) throws SemanticException { LOG.info("Vectorizing MapWork..."); mapWork.setVectorMode(true); Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); MapWorkVectorizationNodeProcessor vnp = - new MapWorkVectorizationNodeProcessor(mapWork, isTez, vectorTaskColumnInfo); + new MapWorkVectorizationNodeProcessor(mapWork, isTezOrSpark, vectorTaskColumnInfo); addMapWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new PreOrderOnceWalker(disp); @@ -923,11 +1098,34 @@ public class Vectorizer implements PhysicalPlanResolver { return; } - private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException { + private void setReduceWorkExplainConditions(ReduceWork reduceWork) { + + reduceWork.setVectorizationExamined(true); + + reduceWork.setReduceVectorizationEnabled(isReduceVectorizationEnabled); + reduceWork.setVectorReduceEngine( + HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE)); + } + + private void convertReduceWork(ReduceWork reduceWork) throws SemanticException { + + // Global used when setting errors, etc. + currentBaseWork = reduceWork; + currentBaseWork.setVectorizationEnabled(true); + VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); - boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo, isTez); + vectorTaskColumnInfo.assume(); + + boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo); if (ret) { - vectorizeReduceWork(reduceWork, vectorTaskColumnInfo, isTez); + vectorizeReduceWork(reduceWork, vectorTaskColumnInfo); + } else if (currentBaseWork.getVectorizationEnabled()) { + VectorizerReason notVectorizedReason = currentBaseWork.getNotVectorizedReason(); + if (notVectorizedReason == null) { + LOG.info("Cannot vectorize: unknown"); + } else { + LOG.info("Cannot vectorize: " + notVectorizedReason.toString()); + } } } @@ -941,13 +1139,14 @@ public class Vectorizer implements PhysicalPlanResolver { // Check key ObjectInspector. ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector(); if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector)) { + setNodeIssue("Key object inspector missing or not StructObjectInspector"); return false; } StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector; List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs(); - // Tez doesn't use tagging... if (reduceWork.getNeedsTagging()) { + setNodeIssue("Tez doesn't use tagging"); return false; } @@ -955,6 +1154,7 @@ public class Vectorizer implements PhysicalPlanResolver { ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector(); if (valueObjectInspector == null || !(valueObjectInspector instanceof StructObjectInspector)) { + setNodeIssue("Value object inspector missing or not StructObjectInspector"); return false; } StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector; @@ -984,7 +1184,7 @@ public class Vectorizer implements PhysicalPlanResolver { } private boolean validateReduceWork(ReduceWork reduceWork, - VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException { + VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException { LOG.info("Validating ReduceWork..."); @@ -1015,7 +1215,7 @@ public class Vectorizer implements PhysicalPlanResolver { } private void vectorizeReduceWork(ReduceWork reduceWork, - VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException { + VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException { LOG.info("Vectorizing ReduceWork..."); reduceWork.setVectorMode(true); @@ -1025,7 +1225,7 @@ public class Vectorizer implements PhysicalPlanResolver { // VectorizationContext... Do we use PreOrderWalker instead of DefaultGraphWalker. Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); ReduceWorkVectorizationNodeProcessor vnp = - new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo, isTez); + new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo); addReduceWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new PreOrderWalker(disp); @@ -1053,7 +1253,7 @@ public class Vectorizer implements PhysicalPlanResolver { class MapWorkValidationNodeProcessor implements NodeProcessor { private final MapWork mapWork; - private final boolean isTez; + private final boolean isTezOrSpark; // Children of Vectorized GROUPBY that outputs rows instead of vectorized row batchs. protected final Set<Operator<? extends OperatorDesc>> nonVectorizedOps = @@ -1063,9 +1263,9 @@ public class Vectorizer implements PhysicalPlanResolver { return nonVectorizedOps; } - public MapWorkValidationNodeProcessor(MapWork mapWork, boolean isTez) { + public MapWorkValidationNodeProcessor(MapWork mapWork, boolean isTezOrSpark) { this.mapWork = mapWork; - this.isTez = isTez; + this.isTezOrSpark = isTezOrSpark; } @Override @@ -1077,13 +1277,13 @@ public class Vectorizer implements PhysicalPlanResolver { return new Boolean(true); } boolean ret; + currentOperator = op; try { - ret = validateMapWorkOperator(op, mapWork, isTez); + ret = validateMapWorkOperator(op, mapWork, isTezOrSpark); } catch (Exception e) { throw new SemanticException(e); } if (!ret) { - LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized."); return new Boolean(false); } // When Vectorized GROUPBY outputs rows instead of vectorized row batches, we don't @@ -1119,9 +1319,9 @@ public class Vectorizer implements PhysicalPlanResolver { if (nonVectorizedOps.contains(op)) { return new Boolean(true); } + currentOperator = op; boolean ret = validateReduceWorkOperator(op); if (!ret) { - LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized."); return new Boolean(false); } // When Vectorized GROUPBY outputs rows instead of vectorized row batches, we don't @@ -1142,9 +1342,12 @@ public class Vectorizer implements PhysicalPlanResolver { // The vectorization context for the Map or Reduce task. protected VectorizationContext taskVectorizationContext; + protected final VectorTaskColumnInfo vectorTaskColumnInfo; protected final Set<Operator<? extends OperatorDesc>> nonVectorizedOps; - VectorizationNodeProcessor(Set<Operator<? extends OperatorDesc>> nonVectorizedOps) { + VectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo, + Set<Operator<? extends OperatorDesc>> nonVectorizedOps) { + this.vectorTaskColumnInfo = vectorTaskColumnInfo; this.nonVectorizedOps = nonVectorizedOps; } @@ -1192,11 +1395,11 @@ public class Vectorizer implements PhysicalPlanResolver { } public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op, - VectorizationContext vContext, boolean isTez) throws SemanticException { + VectorizationContext vContext, boolean isTezOrSpark) throws SemanticException { Operator<? extends OperatorDesc> vectorOp = op; try { if (!opsDone.contains(op)) { - vectorOp = vectorizeOperator(op, vContext, isTez); + vectorOp = vectorizeOperator(op, vContext, isTezOrSpark, vectorTaskColumnInfo); opsDone.add(op); if (vectorOp != op) { opToVectorOpMap.put(op, vectorOp); @@ -1220,14 +1423,14 @@ public class Vectorizer implements PhysicalPlanResolver { private final MapWork mWork; private final VectorTaskColumnInfo vectorTaskColumnInfo; - private final boolean isTez; + private final boolean isTezOrSpark; - public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez, + public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTezOrSpark, VectorTaskColumnInfo vectorTaskColumnInfo) { - super(vectorTaskColumnInfo.getNonVectorizedOps()); + super(vectorTaskColumnInfo, vectorTaskColumnInfo.getNonVectorizedOps()); this.mWork = mWork; this.vectorTaskColumnInfo = vectorTaskColumnInfo; - this.isTez = isTez; + this.isTezOrSpark = isTezOrSpark; } @Override @@ -1241,6 +1444,7 @@ public class Vectorizer implements PhysicalPlanResolver { VectorizationContext vContext = null; + currentOperator = op; if (op instanceof TableScanOperator) { if (taskVectorizationContext == null) { taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo); @@ -1261,7 +1465,7 @@ public class Vectorizer implements PhysicalPlanResolver { + " using vectorization context" + vContext.toString()); } - Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTez); + Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTezOrSpark); if (LOG.isDebugEnabled()) { if (vectorOp instanceof VectorizationContextRegion) { @@ -1279,7 +1483,6 @@ public class Vectorizer implements PhysicalPlanResolver { private final VectorTaskColumnInfo vectorTaskColumnInfo; - private final boolean isTez; private Operator<? extends OperatorDesc> rootVectorOp; @@ -1287,13 +1490,11 @@ public class Vectorizer implements PhysicalPlanResolver { return rootVectorOp; } - public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo, - boolean isTez) { + public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo) { - super(vectorTaskColumnInfo.getNonVectorizedOps()); + super(vectorTaskColumnInfo, vectorTaskColumnInfo.getNonVectorizedOps()); this.vectorTaskColumnInfo = vectorTaskColumnInfo; rootVectorOp = null; - this.isTez = isTez; } @Override @@ -1309,6 +1510,7 @@ public class Vectorizer implements PhysicalPlanResolver { boolean saveRootVectorOp = false; + currentOperator = op; if (op.getParentOperators().size() == 0) { LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.allColumnNames.toString()); @@ -1333,7 +1535,7 @@ public class Vectorizer implements PhysicalPlanResolver { assert vContext != null; LOG.info("ReduceWorkVectorizationNodeProcessor process operator " + op.getName() + " using vectorization context" + vContext.toString()); - Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, isTez); + Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext, true); if (LOG.isDebugEnabled()) { if (vectorOp instanceof VectorizationContextRegion) { @@ -1390,6 +1592,10 @@ public class Vectorizer implements PhysicalPlanResolver { HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_VECTORIZATION_USE_ROW_DESERIALIZE); + isReduceVectorizationEnabled = + HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED); + isSchemaEvolution = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_SCHEMA_EVOLUTION); @@ -1407,18 +1613,32 @@ public class Vectorizer implements PhysicalPlanResolver { return physicalContext; } - boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, MapWork mWork, boolean isTez) { - boolean ret = false; + private void setOperatorNotSupported(Operator<? extends OperatorDesc> op) { + OperatorDesc desc = op.getConf(); + Annotation note = AnnotationUtils.getAnnotation(desc.getClass(), Explain.class); + if (note != null) { + Explain explainNote = (Explain) note; + setNodeIssue(explainNote.displayName() + " (" + op.getType() + ") not supported"); + } else { + setNodeIssue("Operator " + op.getType() + " not supported"); + } + } + + boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, MapWork mWork, boolean isTezOrSpark) { + boolean ret; switch (op.getType()) { case MAPJOIN: if (op instanceof MapJoinOperator) { ret = validateMapJoinOperator((MapJoinOperator) op); } else if (op instanceof SMBMapJoinOperator) { ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op); + } else { + setOperatorNotSupported(op); + ret = false; } break; case GROUPBY: - ret = validateGroupByOperator((GroupByOperator) op, false, isTez); + ret = validateGroupByOperator((GroupByOperator) op, false, isTezOrSpark); break; case FILTER: ret = validateFilterOperator((FilterOperator) op); @@ -1443,6 +1663,7 @@ public class Vectorizer implements PhysicalPlanResolver { validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op); break; default: + setOperatorNotSupported(op); ret = false; break; } @@ -1450,7 +1671,7 @@ public class Vectorizer implements PhysicalPlanResolver { } boolean validateReduceWorkOperator(Operator<? extends OperatorDesc> op) { - boolean ret = false; + boolean ret; switch (op.getType()) { case MAPJOIN: // Does MAPJOIN actually get planned in Reduce? @@ -1458,6 +1679,9 @@ public class Vectorizer implements PhysicalPlanResolver { ret = validateMapJoinOperator((MapJoinOperator) op); } else if (op instanceof SMBMapJoinOperator) { ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op); + } else { + setOperatorNotSupported(op); + ret = false; } break; case GROUPBY: @@ -1465,6 +1689,7 @@ public class Vectorizer implements PhysicalPlanResolver { HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED)) { ret = validateGroupByOperator((GroupByOperator) op, true, true); } else { + setNodeIssue("Operator " + op.getType() + " not enabled (" + HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED.name() + "=true IS false)"); ret = false; } break; @@ -1490,6 +1715,7 @@ public class Vectorizer implements PhysicalPlanResolver { validateSparkHashTableSinkOperator((SparkHashTableSinkOperator) op); break; default: + setOperatorNotSupported(op); ret = false; break; } @@ -1512,7 +1738,7 @@ public class Vectorizer implements PhysicalPlanResolver { throws SemanticException { if (op.getType().equals(OperatorType.GROUPBY)) { GroupByDesc desc = (GroupByDesc) op.getConf(); - return !desc.getVectorDesc().isVectorOutput(); + return !((VectorGroupByDesc) desc.getVectorDesc()).isVectorOutput(); } return false; } @@ -1526,6 +1752,7 @@ public class Vectorizer implements PhysicalPlanResolver { private boolean validateTableScanOperator(TableScanOperator op, MapWork mWork) { TableScanDesc desc = op.getConf(); if (desc.isGatherStats()) { + setOperatorIssue("gather stats not supported"); return false; } @@ -1540,25 +1767,21 @@ public class Vectorizer implements PhysicalPlanResolver { private boolean validateMapJoinDesc(MapJoinDesc desc) { byte posBigTable = (byte) desc.getPosBigTable(); List<ExprNodeDesc> filterExprs = desc.getFilters().get(posBigTable); - if (!validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER)) { - LOG.info("Cannot vectorize map work filter expression"); + if (!validateExprNodeDesc(filterExprs, "Filter", VectorExpressionDescriptor.Mode.FILTER)) { return false; } List<ExprNodeDesc> keyExprs = desc.getKeys().get(posBigTable); - if (!validateExprNodeDesc(keyExprs)) { - LOG.info("Cannot vectorize map work key expression"); + if (!validateExprNodeDesc(keyExprs, "Key")) { return false; } List<ExprNodeDesc> valueExprs = desc.getExprs().get(posBigTable); - if (!validateExprNodeDesc(valueExprs)) { - LOG.info("Cannot vectorize map work value expression"); + if (!validateExprNodeDesc(valueExprs, "Value")) { return false; } Byte[] order = desc.getTagOrder(); Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]); List<ExprNodeDesc> smallTableExprs = desc.getExprs().get(posSingleVectorMapJoinSmallTable); - if (!validateExprNodeDesc(smallTableExprs)) { - LOG.info("Cannot vectorize map work small table expression"); + if (!validateExprNodeDesc(smallTableExprs, "Small Table")) { return false; } return true; @@ -1571,24 +1794,23 @@ public class Vectorizer implements PhysicalPlanResolver { List<ExprNodeDesc> filterExprs = desc.getFilters().get(tag); List<ExprNodeDesc> keyExprs = desc.getKeys().get(tag); List<ExprNodeDesc> valueExprs = desc.getExprs().get(tag); - return validateExprNodeDesc(filterExprs, VectorExpressionDescriptor.Mode.FILTER) && - validateExprNodeDesc(keyExprs) && validateExprNodeDesc(valueExprs); + return validateExprNodeDesc(filterExprs, "Filter", VectorExpressionDescriptor.Mode.FILTER) && + validateExprNodeDesc(keyExprs, "Key") && validateExprNodeDesc(valueExprs, "Value"); } private boolean validateReduceSinkOperator(ReduceSinkOperator op) { List<ExprNodeDesc> keyDescs = op.getConf().getKeyCols(); List<ExprNodeDesc> partitionDescs = op.getConf().getPartitionCols(); List<ExprNodeDesc> valueDesc = op.getConf().getValueCols(); - return validateExprNodeDesc(keyDescs) && validateExprNodeDesc(partitionDescs) && - validateExprNodeDesc(valueDesc); + return validateExprNodeDesc(keyDescs, "Key") && validateExprNodeDesc(partitionDescs, "Partition") && + validateExprNodeDesc(valueDesc, "Value"); } private boolean validateSelectOperator(SelectOperator op) { List<ExprNodeDesc> descList = op.getConf().getColList(); for (ExprNodeDesc desc : descList) { - boolean ret = validateExprNodeDesc(desc); + boolean ret = validateExprNodeDesc(desc, "Select"); if (!ret) { - LOG.info("Cannot vectorize select expression: " + desc.toString()); return false; } } @@ -1597,28 +1819,26 @@ public class Vectorizer implements PhysicalPlanResolver { private boolean validateFilterOperator(FilterOperator op) { ExprNodeDesc desc = op.getConf().getPredicate(); - return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.FILTER); + return validateExprNodeDesc(desc, "Predicate", VectorExpressionDescriptor.Mode.FILTER); } - private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTez) { + private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTezOrSpark) { GroupByDesc desc = op.getConf(); - VectorGroupByDesc vectorDesc = desc.getVectorDesc(); if (desc.isGroupingSetsPresent()) { - LOG.info("Grouping sets not supported in vector mode"); + setOperatorIssue("Grouping sets not supported"); return false; } if (desc.pruneGroupingSetId()) { - LOG.info("Pruning grouping set id not supported in vector mode"); + setOperatorIssue("Pruning grouping set id not supported"); return false; } if (desc.getMode() != GroupByDesc.Mode.HASH && desc.isDistinct()) { - LOG.info("DISTINCT not supported in vector mode"); + setOperatorIssue("DISTINCT not supported"); return false; } - boolean ret = validateExprNodeDesc(desc.getKeys()); + boolean ret = validateExprNodeDesc(desc.getKeys(), "Key"); if (!ret) { - LOG.info("Cannot vectorize groupby key expression " + desc.getKeys().toString()); return false; } @@ -1731,6 +1951,9 @@ public class Vectorizer implements PhysicalPlanResolver { // If all the aggregation outputs are primitive, we can output VectorizedRowBatch. // Otherwise, we the rest of the operator tree will be row mode. + VectorGroupByDesc vectorDesc = new VectorGroupByDesc(); + desc.setVectorDesc(vectorDesc); + vectorDesc.setVectorOutput(retPair.right); vectorDesc.setProcessingMode(processingMode); @@ -1745,14 +1968,15 @@ public class Vectorizer implements PhysicalPlanResolver { return true; } - private boolean validateExprNodeDesc(List<ExprNodeDesc> descs) { - return validateExprNodeDesc(descs, VectorExpressionDescriptor.Mode.PROJECTION); + private boolean validateExprNodeDesc(List<ExprNodeDesc> descs, String expressionTitle) { + return validateExprNodeDesc(descs, expressionTitle, VectorExpressionDescriptor.Mode.PROJECTION); } private boolean validateExprNodeDesc(List<ExprNodeDesc> descs, + String expressionTitle, VectorExpressionDescriptor.Mode mode) { for (ExprNodeDesc d : descs) { - boolean ret = validateExprNodeDesc(d, mode); + boolean ret = validateExprNodeDesc(d, expressionTitle, mode); if (!ret) { return false; } @@ -1775,19 +1999,20 @@ public class Vectorizer implements PhysicalPlanResolver { return new Pair<Boolean, Boolean>(true, outputIsPrimitive); } - private boolean validateExprNodeDescRecursive(ExprNodeDesc desc, VectorExpressionDescriptor.Mode mode) { + private boolean validateExprNodeDescRecursive(ExprNodeDesc desc, String expressionTitle, + VectorExpressionDescriptor.Mode mode) { if (desc instanceof ExprNodeColumnDesc) { ExprNodeColumnDesc c = (ExprNodeColumnDesc) desc; // Currently, we do not support vectorized virtual columns (see HIVE-5570). if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(c.getColumn())) { - LOG.info("Cannot vectorize virtual column " + c.getColumn()); + setExpressionIssue(expressionTitle, "Virtual columns not supported (" + c.getColumn() + ")"); return false; } } String typeName = desc.getTypeInfo().getTypeName(); boolean ret = validateDataType(typeName, mode); if (!ret) { - LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName); + setExpressionIssue(expressionTitle, "Data type " + typeName + " of " + desc.toString() + " not supported"); return false; } boolean isInExpression = false; @@ -1795,7 +2020,7 @@ public class Vectorizer implements PhysicalPlanResolver { ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc; boolean r = validateGenericUdf(d); if (!r) { - LOG.info("Cannot vectorize UDF " + d); + setExpressionIssue(expressionTitle, "UDF " + d + " not supported"); return false; } GenericUDF genericUDF = d.getGenericUDF(); @@ -1806,14 +2031,14 @@ public class Vectorizer implements PhysicalPlanResolver { && desc.getChildren().get(0).getTypeInfo().getCategory() == Category.STRUCT) { // Don't restrict child expressions for projection. // Always use loose FILTER mode. - if (!validateStructInExpression(desc, VectorExpressionDescriptor.Mode.FILTER)) { + if (!validateStructInExpression(desc, expressionTitle, VectorExpressionDescriptor.Mode.FILTER)) { return false; } } else { for (ExprNodeDesc d : desc.getChildren()) { // Don't restrict child expressions for projection. // Always use loose FILTER mode. - if (!validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER)) { + if (!validateExprNodeDescRecursive(d, expressionTitle, VectorExpressionDescriptor.Mode.FILTER)) { return false; } } @@ -1823,7 +2048,7 @@ public class Vectorizer implements PhysicalPlanResolver { } private boolean validateStructInExpression(ExprNodeDesc desc, - VectorExpressionDescriptor.Mode mode) { + String expressionTitle, VectorExpressionDescriptor.Mode mode) { for (ExprNodeDesc d : desc.getChildren()) { TypeInfo typeInfo = d.getTypeInfo(); if (typeInfo.getCategory() != Category.STRUCT) { @@ -1839,7 +2064,8 @@ public class Vectorizer implements PhysicalPlanResolver { TypeInfo fieldTypeInfo = fieldTypeInfos.get(f); Category category = fieldTypeInfo.getCategory(); if (category != Category.PRIMITIVE) { - LOG.info("Cannot vectorize struct field " + fieldNames.get(f) + setExpressionIssue(expressionTitle, + "Cannot vectorize struct field " + fieldNames.get(f) + " of type " + fieldTypeInfo.getTypeName()); return false; } @@ -1852,7 +2078,8 @@ public class Vectorizer implements PhysicalPlanResolver { if (inConstantType != InConstantType.INT_FAMILY && inConstantType != InConstantType.FLOAT_FAMILY && inConstantType != InConstantType.STRING_FAMILY) { - LOG.info("Cannot vectorize struct field " + fieldNames.get(f) + setExpressionIssue(expressionTitle, + "Cannot vectorize struct field " + fieldNames.get(f) + " of type " + fieldTypeInfo.getTypeName()); return false; } @@ -1861,31 +2088,28 @@ public class Vectorizer implements PhysicalPlanResolver { return true; } - private boolean validateExprNodeDesc(ExprNodeDesc desc) { - return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.PROJECTION); + private boolean validateExprNodeDesc(ExprNodeDesc desc, String expressionTitle) { + return validateExprNodeDesc(desc, expressionTitle, VectorExpressionDescriptor.Mode.PROJECTION); } - boolean validateExprNodeDesc(ExprNodeDesc desc, VectorExpressionDescriptor.Mode mode) { - if (!validateExprNodeDescRecursive(desc, mode)) { + boolean validateExprNodeDesc(ExprNodeDesc desc, String expressionTitle, + VectorExpressionDescriptor.Mode mode) { + if (!validateExprNodeDescRecursive(desc, expressionTitle, mode)) { return false; } try { VectorizationContext vc = new ValidatorVectorizationContext(hiveConf); if (vc.getVectorExpression(desc, mode) == null) { // TODO: this cannot happen - VectorizationContext throws in such cases. - LOG.info("getVectorExpression returned null"); + setExpressionIssue(expressionTitle, "getVectorExpression returned null"); return false; } } catch (Exception e) { if (e instanceof HiveException) { - LOG.info(e.getMessage()); + setExpressionIssue(expressionTitle, e.getMessage()); } else { - if (LOG.isDebugEnabled()) { - // Show stack trace. - LOG.debug("Failed to vectorize", e); - } else { - LOG.info("Failed to vectorize", e.getMessage()); - } + String issue = "exception: " + VectorizationContext.getStackTraceAsSingleLine(e); + setExpressionIssue(expressionTitle, issue); } return false; } @@ -1905,9 +2129,9 @@ public class Vectorizer implements PhysicalPlanResolver { } } - private boolean validateAggregationIsPrimitive(VectorAggregateExpression vectorAggrExpr) { + public static ObjectInspector.Category aggregationOutputCategory(VectorAggregateExpression vectorAggrExpr) { ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector(); - return (outputObjInspector.getCategory() == ObjectInspector.Category.PRIMITIVE); + return outputObjInspector.getCategory(); } private Pair<Boolean,Boolean> validateAggregationDesc(AggregationDesc aggDesc, ProcessingMode processingMode, @@ -1915,11 +2139,10 @@ public class Vectorizer implements PhysicalPlanResolver { String udfName = aggDesc.getGenericUDAFName().toLowerCase(); if (!supportedAggregationUdfs.contains(udfName)) { - LOG.info("Cannot vectorize groupby aggregate expression: UDF " + udfName + " not supported"); + setExpressionIssue("Aggregation Function", "UDF " + udfName + " not supported"); return new Pair<Boolean,Boolean>(false, false); } - if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters())) { - LOG.info("Cannot vectorize groupby aggregate expression: UDF parameters not supported"); + if (aggDesc.getParameters() != null && !validateExprNodeDesc(aggDesc.getParameters(), "Aggregation Function UDF " + udfName + " parameter")) { return new Pair<Boolean,Boolean>(false, false); } @@ -1933,6 +2156,7 @@ public class Vectorizer implements PhysicalPlanResolver { if (LOG.isDebugEnabled()) { LOG.debug("Vectorization of aggreation should have succeeded ", e); } + setExpressionIssue("Aggregation Function", "Vectorization of aggreation should have succeeded " + e); return new Pair<Boolean,Boolean>(false, false); } if (LOG.isDebugEnabled()) { @@ -1940,11 +2164,12 @@ public class Vectorizer implements PhysicalPlanResolver { " vector expression " + vectorAggrExpr.toString()); } - boolean outputIsPrimitive = validateAggregationIsPrimitive(vectorAggrExpr); + ObjectInspector.Category outputCategory = aggregationOutputCategory(vectorAggrExpr); + boolean outputIsPrimitive = (outputCategory == ObjectInspector.Category.PRIMITIVE); if (processingMode == ProcessingMode.MERGE_PARTIAL && hasKeys && !outputIsPrimitive) { - LOG.info("Vectorized Reduce MergePartial GROUP BY keys can only handle aggregate outputs that are primitive types"); + setOperatorIssue("Vectorized Reduce MergePartial GROUP BY keys can only handle aggregate outputs that are primitive types"); return new Pair<Boolean,Boolean>(false, false); } @@ -2012,12 +2237,12 @@ public class Vectorizer implements PhysicalPlanResolver { if (smallTableIndices[i] < 0) { // Negative numbers indicate a column to be (deserialize) read from the small table's // LazyBinary value row. - LOG.info("Vectorizer isBigTableOnlyResults smallTableIndices[i] < 0 returning false"); + setOperatorIssue("Vectorizer isBigTableOnlyResults smallTableIndices[i] < 0 returning false"); return false; } } } else if (smallTableRetainSize > 0) { - LOG.info("Vectorizer isBigTableOnlyResults smallTableRetainSize > 0 returning false"); + setOperatorIssue("Vectorizer isBigTableOnlyResults smallTableRetainSize > 0 returning false"); return false; } @@ -2026,20 +2251,21 @@ public class Vectorizer implements PhysicalPlanResolver { } Operator<? extends OperatorDesc> specializeMapJoinOperator(Operator<? extends OperatorDesc> op, - VectorizationContext vContext, MapJoinDesc desc) throws HiveException { + VectorizationContext vContext, MapJoinDesc desc, VectorMapJoinInfo vectorMapJoinInfo) + throws HiveException { Operator<? extends OperatorDesc> vectorOp = null; Class<? extends Operator<?>> opClass = null; - VectorMapJoinDesc.HashTableImplementationType hashTableImplementationType = HashTableImplementationType.NONE; - VectorMapJoinDesc.HashTableKind hashTableKind = HashTableKind.NONE; - VectorMapJoinDesc.HashTableKeyType hashTableKeyType = HashTableKeyType.NONE; + VectorMapJoinDesc vectorDesc = (VectorMapJoinDesc) desc.getVectorDesc(); + + HashTableImplementationType hashTableImplementationType = HashTableImplementationType.NONE; + HashTableKind hashTableKind = HashTableKind.NONE; + HashTableKeyType hashTableKeyType = HashTableKeyType.NONE; + OperatorVariation operatorVariation = OperatorVariation.NONE; - if (HiveConf.getBoolVar(hiveConf, - HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) { + if (vectorDesc.getIsFastHashTableEnabled()) { hashTableImplementationType = HashTableImplementationType.FAST; } else { - // Restrict to using BytesBytesMultiHashMap via MapJoinBytesTableContainer or - // HybridHashTableContainer. hashTableImplementationType = HashTableImplementationType.OPTIMIZED; } @@ -2061,20 +2287,31 @@ public class Vectorizer implements PhysicalPlanResolver { Map<Byte, List<ExprNodeDesc>> keyExprs = desc.getKeys(); List<ExprNodeDesc> bigTableKeyExprs = keyExprs.get(posBigTable); if (bigTableKeyExprs.size() == 1) { - String typeName = bigTableKeyExprs.get(0).getTypeString(); - LOG.info("Vectorizer vectorizeOperator map join typeName " + typeName); - if (typeName.equals("boolean")) { + TypeInfo typeInfo = bigTableKeyExprs.get(0).getTypeInfo(); + LOG.info("Vectorizer vectorizeOperator map join typeName " + typeInfo.getTypeName()); + switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) { + case BOOLEAN: hashTableKeyType = HashTableKeyType.BOOLEAN; - } else if (typeName.equals("tinyint")) { + break; + case BYTE: hashTableKeyType = HashTableKeyType.BYTE; - } else if (typeName.equals("smallint")) { + break; + case SHORT: hashTableKeyType = HashTableKeyType.SHORT; - } else if (typeName.equals("int")) { + break; + case INT: hashTableKeyType = HashTableKeyType.INT; - } else if (typeName.equals("bigint") || typeName.equals("long")) { + break; + case LONG: hashTableKeyType = HashTableKeyType.LONG; - } else if (VectorizationContext.isStringFamily(typeName)) { + break; + case STRING: + case CHAR: + case VARCHAR: + case BINARY: hashTableKeyType = HashTableKeyType.STRING; + default: + // Stay with multi-key. } } } @@ -2082,16 +2319,20 @@ public class Vectorizer implements PhysicalPlanResolver { switch (joinType) { case JoinDesc.INNER_JOIN: if (!isInnerBigOnly) { + operatorVariation = OperatorVariation.INNER; hashTableKind = HashTableKind.HASH_MAP; } else { + operatorVariation = OperatorVariation.INNER_BIG_ONLY; hashTableKind = HashTableKind.HASH_MULTISET; } break; case JoinDesc.LEFT_OUTER_JOIN: case JoinDesc.RIGHT_OUTER_JOIN: + operatorVariation = OperatorVariation.OUTER; hashTableKind = HashTableKind.HASH_MAP; break; case JoinDesc.LEFT_SEMI_JOIN: + operatorVariation = OperatorVariation.LEFT_SEMI; hashTableKind = HashTableKind.HASH_SET; break; default: @@ -2106,86 +2347,84 @@ public class Vectorizer implements PhysicalPlanResolver { case SHORT: case INT: case LONG: - switch (joinType) { - case JoinDesc.INNER_JOIN: - if (!isInnerBigOnly) { - opClass = VectorMapJoinInnerLongOperator.class; - } else { - opClass = VectorMapJoinInnerBigOnlyLongOperator.class; - } + switch (operatorVariation) { + case INNER: + opClass = VectorMapJoinInnerLongOperator.class; break; - case JoinDesc.LEFT_OUTER_JOIN: - case JoinDesc.RIGHT_OUTER_JOIN: - opClass = VectorMapJoinOuterLongOperator.class; + case INNER_BIG_ONLY: + opClass = VectorMapJoinInnerBigOnlyLongOperator.class; break; - case JoinDesc.LEFT_SEMI_JOIN: + case LEFT_SEMI: opClass = VectorMapJoinLeftSemiLongOperator.class; break; + case OUTER: + opClass = VectorMapJoinOuterLongOperator.class; + break; default: - throw new HiveException("Unknown join type " + joinType); + throw new HiveException("Unknown operator variation " + operatorVariation); } break; case STRING: - switch (joinType) { - case JoinDesc.INNER_JOIN: - if (!isInnerBigOnly) { - opClass = VectorMapJoinInnerStringOperator.class; - } else { - opClass = VectorMapJoinInnerBigOnlyStringOperator.class; - } + switch (operatorVariation) { + case INNER: + opClass = VectorMapJoinInnerStringOperator.class; break; - case JoinDesc.LEFT_OUTER_JOIN: - case JoinDesc.RIGHT_OUTER_JOIN: - opClass = VectorMapJoinOuterStringOperator.class; + case INNER_BIG_ONLY: + opClass = VectorMapJoinInnerBigOnlyStringOperator.class; break; - case JoinDesc.LEFT_SEMI_JOIN: + case LEFT_SEMI: opClass = VectorMapJoinLeftSemiStringOperator.class; break; + case OUTER: + opClass = VectorMapJoinOuterStringOperator.class; + break; default: - throw new HiveException("Unknown join type " + joinType); + throw new HiveException("Unknown operator variation " + operatorVariation); } break; case MULTI_KEY: - switch (joinType) { - case JoinDesc.INNER_JOIN: - if (!isInnerBigOnly) { - opClass = VectorMapJoinInnerMultiKeyOperator.class; - } else { - opClass = VectorMapJoinInnerBigOnlyMultiKeyOperator.class; - } + switch (operatorVariation) { + case INNER: + opClass = VectorMapJoinInnerMultiKeyOperator.class; break; - case JoinDesc.LEFT_OUTER_JOIN: - case JoinDesc.RIGHT_OUTER_JOIN: - opClass = VectorMapJoinOuterMultiKeyOperator.class; + case INNER_BIG_ONLY: + opClass = VectorMapJoinInnerBigOnlyMultiKeyOperator.class; break; - case JoinDesc.LEFT_SEMI_JOIN: + case LEFT_SEMI: opClass = VectorMapJoinLeftSemiMultiKeyOperator.class; break; + case OUTER: + opClass = VectorMapJoinOuterMultiKeyOperator.class; + break; default: - throw new HiveException("Unknown join type " + joinType); + throw new HiveException("Unknown operator variation " + operatorVariation); } break; + default: + throw new RuntimeException("Unexpected hash table key type " + hashTableKeyType.name()); } - vectorOp = OperatorFactory.getVectorOperator( - opClass, op.getCompilationOpContext(), op.getConf(), vContext); - LOG.info("Vectorizer vectorizeOperator map join class " + vectorOp.getClass().getSimpleName()); - boolean minMaxEnabled = HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_MINMAX_ENABLED); - VectorMapJoinDesc vectorDesc = desc.getVectorDesc(); vectorDesc.setHashTableImplementationType(hashTableImplementationType); vectorDesc.setHashTableKind(hashTableKind); vectorDesc.setHashTableKeyType(hashTableKeyType); + vectorDesc.setOperatorVariation(operatorVariation); vectorDesc.setMinMaxEnabled(minMaxEnabled); + vectorDesc.setVectorMapJoinInfo(vectorMapJoinInfo); + + vectorOp = OperatorFactory.getVectorOperator( + opClass, op.getCompilationOpContext(), op.getConf(), vContext); + LOG.info("Vectorizer vectorizeOperator map join class " + vectorOp.getClass().getSimpleName()); + return vectorOp; } - private boolean onExpressionHasNullSafes(MapJoinDesc desc) { + public static boolean onExpressionHasNullSafes(MapJoinDesc desc) { boolean[] nullSafes = desc.getNullSafes(); if (nullSafes == null) { - return false; + return false; } for (boolean nullSafe : nullSafes) { if (nullSafe) { @@ -2196,53 +2435,372 @@ public class Vectorizer implements PhysicalPlanResolver { } private boolean canSpecializeMapJoin(Operator<? extends OperatorDesc> op, MapJoinDesc desc, - boolean isTez) { + boolean isTezOrSpark, VectorizationContext vContext, VectorMapJoinInfo vectorMapJoinInfo) + throws HiveException { + + Preconditions.checkState(op instanceof MapJoinOperator); + + // Allocate a VectorReduceSinkDesc initially with implementation type NONE so EXPLAIN + // can report this operator was vectorized, but not native. And, the conditions. + VectorMapJoinDesc vectorDesc = new VectorMapJoinDesc(); + desc.setVectorDesc(vectorDesc); + + boolean isVectorizationMapJoinNativeEnabled = HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_ENABLED); + + String engine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + + boolean oneMapJoinCondition = (desc.getConds().length == 1); + + boolean hasNullSafes = onExpressionHasNullSafes(desc); + + byte posBigTable = (byte) desc.getPosBigTable(); + + // Since we want to display all the met and not met conditions in EXPLAIN, we determine all + // information first.... + + List<ExprNodeDesc> keyDesc = desc.getKeys().get(posBigTable); + VectorExpression[] allBigTableKeyExpressions = vContext.getVectorExpressions(keyDesc); + final int allBigTableKeyExpressionsLength = allBigTableKeyExpressions.length; + boolean isEmptyKey = (allBigTableKeyExpressionsLength == 0); - boolean specialize = false; + boolean supportsKeyTypes = true; // Assume. + HashSet<String> notSupportedKeyTypes = new HashSet<String>(); - if (op instanceof MapJoinOperator && + // Since a key expression can be a calculation and the key will go into a scratch column, + // we need the mapping and type information. + int[] bigTableKeyColumnMap = new int[allBigTableKeyExpressionsLength]; + String[] bigTableKeyColumnNames = new String[allBigTableKeyExpressionsLength]; + TypeInfo[] bigTableKeyTypeInfos = new TypeInfo[allBigTableKeyExpressionsLength]; + ArrayList<VectorExpression> bigTableKeyExpressionsList = new ArrayList<VectorExpression>(); + VectorExpression[] bigTableKeyExpressions; + for (int i = 0; i < allBigTableKeyExpressionsLength; i++) { + VectorExpression ve = allBigTableKeyExpressions[i]; + if (!IdentityExpression.isColumnOnly(ve)) { + bigTableKeyExpressionsList.add(ve); + } + bigTableKeyColumnMap[i] = ve.getOutputColumn(); + + ExprNodeDesc exprNode = keyDesc.get(i); + bigTableKeyColumnNames[i] = exprNode.toString(); + + TypeInfo typeInfo = exprNode.getTypeInfo(); + // Verify we handle the key column types for an optimized table. This is the effectively the + // same check used in HashTableLoader. + if (!MapJoinKey.isSupportedField(typeInfo)) { + supportsKeyTypes = false; + Category category = typeInfo.getCategory(); + notSupportedKeyTypes.add( + (category != Category.PRIMITIVE ? category.toString() : + ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory().toString())); + } + bigTableKeyTypeInfos[i] = typeInfo; + } + if (bigTableKeyExpressionsList.size() == 0) { + bigTableKeyExpressions = null; + } else { + bigTableKeyExpressions = bigTableKeyExpressionsList.toArray(new VectorExpression[0]); + } + + List<ExprNodeDesc> bigTableExprs = desc.getExprs().get(posBigTable); + VectorExpression[] allBigTableValueExpressions = vContext.getVectorExpressions(bigTableExprs); + + boolean isFastHashTableEnabled = HiveConf.getBoolVar(hiveConf, - HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_ENABLED)) { + HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED); + vectorDesc.setIsFastHashTableEnabled(isFastHashTableEnabled); - // Currently, only under Tez and non-N-way joins. - if (isTez && desc.getConds().length == 1 && !onExpressionHasNullSafes(desc)) { + // Especially since LLAP is prone to turn it off in the MapJoinDesc in later + // physical optimizer stages... + boolean isHybridHashJoin = desc.isHybridHashJoin(); + vectorDesc.setIsHybridHashJoin(isHybridHashJoin); - // Ok, all basic restrictions satisfied so far... - specialize = true; + /* + * Populate vectorMapJoininfo. + */ - if (!HiveConf.getBoolVar(hiveConf, - HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) { + /* + * Similarly, we need a mapping since a value expression can be a calculation and the value + * will go into a scratch column. + */ + int[] bigTableValueColumnMap = new int[allBigTableValueExpressions.length]; + String[] bigTableValueColumnNames = new String[allBigTableValueExpressions.length]; + TypeInfo[] bigTableValueTypeInfos = new TypeInfo[allBigTableValueExpressions.length]; + ArrayList<VectorExpression> bigTableValueExpressionsList = new ArrayList<VectorExpression>(); + VectorExpression[] bigTableValueExpressions; + for (int i = 0; i < bigTableValueColumnMap.length; i++) { + VectorExpression ve = allBigTableValueExpressions[i]; + if (!IdentityExpression.isColumnOnly(ve)) { + bigTableValueExpressionsList.add(ve); + } + bigTableValueColumnMap[i] = ve.getOutputColumn(); - // We are using the optimized hash table we have further - // restrictions (using optimized and key type). + ExprNodeDesc exprNode = bigTableExprs.get(i); + bigTableValueColumnNames[i] = exprNode.toString(); + bigTableValueTypeInfos[i] = exprNode.getTypeInfo(); + } + if (bigTableValueExpressionsList.size() == 0) { + bigTableValueExpressions = null; + } else { + bigTableValueExpressions = bigTableValueExpressionsList.toArray(new VectorExpression[0]); + } - if (!HiveConf.getBoolVar(hiveConf, - HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE)) { - specialize = false; - } else { - byte posBigTable = (byte) desc.getPosBigTable(); - Map<Byte, List<ExprNodeDesc>> keyExprs = desc.getKeys(); - List<ExprNodeDesc> bigTableKeyExprs = keyExprs.get(posBigTable); - for (ExprNodeDesc exprNodeDesc : bigTableKeyExprs) { - String typeName = exprNodeDesc.getTypeString(); - if (!MapJoinKey.isSupportedField(typeName)) { - specialize = false; - break; - } + vectorMapJoinInfo.setBigTableKeyColumnMap(bigTableKeyColumnMap); + vectorMapJoinInfo.setBigTableKeyColumnNames(bigTableKeyColumnNames); + vectorMapJoinInfo.setBigTableKeyTypeInfos(bigTableKeyTypeInfos); + vectorMapJoinInfo.setBigTableKeyExpressions(bigTableKeyExpressions); + + vectorMapJoinInfo.setBigTableValueColumnMap(bigTableValueColumnMap); + vectorMapJoinInfo.setBigTableValueColumnNames(bigTableValueColumnNames); + vectorMapJoinInfo.setBigTableValueTypeInfos(bigTableValueTypeInfos); + vectorMapJoinInfo.setBigTableValueExpressions(bigTableValueExpressions); + + /* + * Small table information. + */ + VectorColumnOutputMapping bigTableRetainedMapping = + new VectorColumnOutputMapping("Big Table Retained Mapping"); + + VectorColumnOutputMapping bigTableOuterKeyMapping = + new VectorColumnOutputMapping("Big Table Outer Key Mapping"); + + // The order of the fields in the LazyBinary small table value must be used, so + // we use the source ordering flavor for the mapping. + VectorColumnSourceMapping smallTableMapping = + new VectorColumnSourceMapping("Small Table Mapping"); + + Byte[] order = desc.getTagOrder(); + Byte posSingleVectorMapJoinSmallTable = (order[0] == posBigTable ? order[1] : order[0]); + boolean isOuterJoin = !desc.getNoOuterJoin(); + + /* + * Gather up big and small table output result information from the MapJoinDesc. + */ + List<Integer> bigTableRetainList = desc.getRetainList().get(posBigTable); + int bigTableRetainSize = bigTableRetainList.size(); + + int[] smallTableIndices; + int smallTableIndicesSize; + List<ExprNodeDesc> smallTableExprs = desc.getExprs().get(posSingleVectorMapJoinSmallTable); + if (desc.getValueIndices() != null && desc.getValueIndices().get(posSingleVectorMapJoinSmallTable) != null) { + smallTableIndices = desc.getValueIndices().get(posSingleVectorMapJoinSmallTable); + smallTableIndicesSize = smallTableIndices.length; + } else { + smallTableIndices = null; + smallTableIndicesSize = 0; + } + + List<Integer> smallTableRetainList = desc.getRetainList().get(posSingleVectorMapJoinSmallTable); + int smallTableRetainSize = smallTableRetainList.size(); + + int smallTableResultSize = 0; + if (smallTableIndicesSize > 0) { + smallTableResultSize = smallTableIndicesSize; + } else if (smallTableRetainSize > 0) { + smallTableResultSize = smallTableRetainSize; + } + + /* + * Determine the big table retained mapping first so we can optimize out (with + * projection) copying inner join big table keys in the subsequent small table results section. + */ + + // We use a mapping object here so we can build the projection in any order and + // get the ordered by 0 to n-1 output columns at the end. + // + // Also, to avoid copying a big table key into the small table result area for inner joins, + // we reference it with the projection so there can be duplicate output columns + // in the projection. + VectorColumnSourceMapping projectionMapping = new VectorColumnSourceMapping("Projection Mapping"); + + int nextOutputColumn = (order[0] == posBigTable ? 0 : smallTableResultSize); + for (int i = 0; i < bigTableRetainSize; i++) { + + // Since bigTableValueExpressions may do a calculation and produce a scratch column, we + // need to map to the right batch column. + + int retainColumn = bigTableRetainList.get(i); + int batchColumnIndex = bigTableValueColumnMap[retainColumn]; + TypeInfo typeInfo = bigTableValueTypeInfos[i]; + + // With this map we project the big table batch to make it look like an output batch. + projectionMapping.add(nextOutputColumn, batchColumnIndex, typeInfo); + + // Collect columns we copy from the big table batch to the overflow batch. + if (!bigTableRetainedMapping.containsOutputColumn(batchColumnIndex)) { + // Tolerate repeated use of a big table column. + bigTableRetainedMapping.add(batchColumnIndex, batchColumnIndex, typeInfo); + } + + nextOutputColumn++; + } + + /* + * Now determine the small table results. + */ + boolean smallTableExprVectorizes = true; + + int firstSmallTableOutputColumn; + firstSmallTableOutputColumn = (order[0] == posBigTable ? bigTableRetainSize : 0); + int smallTableOutputCount = 0; + nextOutputColumn = firstSmallTableOutputColumn; + + // Small table indices has more information (i.e. keys) than retain, so use it if it exists... + String[] bigTableRetainedNames; + if (smallTableIndicesSize > 0) { + smallTableOutputCount = smallTableIndicesSize; + bigTableRetainedNames = new String[smallTableOutputCount]; + + for (int i = 0; i < smallTableIndicesSize; i++) { + if (smallTableIndices[i] >= 0) { + + // Zero and above numbers indicate a big table key is needed for + // small table result "area". + + int keyIndex = smallTableIndices[i]; + + // Since bigTableKeyExpressions may do a calculation and produce a scratch column, we + // need to map the right column. + int batchKeyColumn = bigTableKeyColumnMap[keyIndex]; + bigTableRetainedNames[i] = bigTableKeyColumnNames[keyIndex]; + TypeInfo typeInfo = bigTableKeyTypeInfos[keyIndex]; + + if (!isOuterJoin) { + + // Optimize inner join keys of small table results. + + // Project the big table key into the small table result "area". + projectionMapping.add(nextOutputColumn, batchKeyColumn, typeInfo); + + if (!bigTableRetainedMapping.containsOutputColumn(batchKeyColumn)) { + // If necessary, copy the big table key into the overflow batch's small table + // result "area". + bigTableRetainedMapping.add(batchKeyColumn, batchKeyColumn, typeInfo); } + } else { + + // For outer joins, since the small table key can be null when there is no match, + // we must have a physical (scratch) column for those keys. We cannot use the + // projection optimization used by inner joins above. + + int scratchColumn = vContext.allocateScratchColumn(typeInfo.getTypeName()); + projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo); + + bigTableRetainedMapping.add(batchKeyColumn, scratchColumn, typeInfo); + + bigTableOuterKeyMapping.add(batchKeyColumn, scratchColumn, typeInfo); } } else { - // With the fast hash table implementation, we currently do not support - // Hybrid Grace Hash Join. + // Negative numbers indicate a column to be (deserialize) read from the small table's + // LazyBinary value row. + int smallTableValueIndex = -smallTableIndices[i] - 1; - if (desc.isHybridHashJoin()) { - specialize = false; + ExprNodeDesc smallTableExprNode = smallTableExprs.get(i); + if (!validateExprNodeDesc(smallTableExprNode, "Small Table")) { + clearNotVectorizedReason(); + smallTableExprVectorizes = false; } + + bigTableRetainedNames[i] = smallTableExprNode.toString(); + + TypeInfo typeInfo = smallTableExprNode.getTypeInfo(); + + // Make a new big table scratch column for the small table value. + int scratchColumn = vContext.allocateScratchColumn(typeInfo.getTypeName()); + projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo); + + smallTableMapping.add(smallTableValueIndex, scratchColumn, typeInfo); + } + nextOutputColumn++; + } + } else if (smallTableRetainSize > 0) { + smallTableOutputCount = smallTableRetainSize; + bigTableRetainedNames = new String[smallTableOutputCount]; + + // Only small table values appear in join output result. + + for (int i = 0; i < smallTableRetainSize; i++) { + int smallTableValueIndex = smallTableRetainList.get(i); + + ExprNodeDesc smallTableExprNode = smallTableExprs.get(i); + if (!validateExprNodeDesc(smallTableExprNode, "Small Table")) { + clearNotVectorizedReason(); + smallTableExprVectorizes = false; } + + bigTableRetainedNames[i] = smallTableExprNode.toString(); + + // Make a new big table scratch column for the small table value. + TypeInfo typeInfo = smallTableExprNode.getTypeInfo(); + int scratchColumn = vContext.allocateScratchColumn(typeInfo.getTypeName()); + + projectionMapping.add(nextOutputColumn, scratchColumn, typeInfo); + + smallTableMapping.add(smallTableValueIndex, scratchColumn, typeInfo); + nextOutputColumn++; } + } else { + bigTableRetainedNames = new String[0]; + } + + // Remember the condition variables for EXPLAIN regardless. + vectorDesc.setIsVectorizationMapJoinNativeEnabled(isVectorizationMapJoinNativeEnabled); + vectorDesc.setEngine(engine); + vectorDesc.setOneMapJoinCondition(oneMapJoinCondition); + vectorDesc.setHasNullSafes(hasNullSafes); + vectorDesc.setSupportsKeyTypes(supportsKeyTypes); + if (!supportsKeyTypes) { + vectorDesc.setNotSupportedKeyTypes(new ArrayList(notSupportedKeyTypes)); + } + vectorDesc.setIsEmptyKey(isEmptyKey); + vectorDesc.setSmallTableExprVectorizes(smallTableExprVectorizes); + + // Currently, only under Tez and non-N-way joi
<TRUNCATED>