http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index a8ebf8f..ce49b24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -28,11 +28,13 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.Stack;
import java.util.regex.Pattern;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -65,11 +67,11 @@ import
org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import
org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import
org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -96,6 +98,7 @@ import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionConversion;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -112,6 +115,7 @@ import
org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.UDFAcos;
import org.apache.hadoop.hive.ql.udf.UDFAsin;
@@ -163,6 +167,8 @@ import
org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import com.google.common.base.Joiner;
+
public class Vectorizer implements PhysicalPlanResolver {
protected static transient final Logger LOG =
LoggerFactory.getLogger(Vectorizer.class);
@@ -324,14 +330,51 @@ public class Vectorizer implements PhysicalPlanResolver {
supportedAggregationUdfs.add("stddev_samp");
}
+ private class VectorTaskColumnInfo {
+ List<String> columnNames;
+ List<TypeInfo> typeInfos;
+ int partitionColumnCount;
+
+ String[] scratchTypeNameArray;
+
+ VectorTaskColumnInfo() {
+ partitionColumnCount = 0;
+ }
+
+ public void setColumnNames(List<String> columnNames) {
+ this.columnNames = columnNames;
+ }
+ public void setTypeInfos(List<TypeInfo> typeInfos) {
+ this.typeInfos = typeInfos;
+ }
+ public void setPartitionColumnCount(int partitionColumnCount) {
+ this.partitionColumnCount = partitionColumnCount;
+ }
+ public void setScratchTypeNameArray(String[] scratchTypeNameArray) {
+ this.scratchTypeNameArray = scratchTypeNameArray;
+ }
+
+ public void transferToBaseWork(BaseWork baseWork) {
+
+ String[] columnNameArray = columnNames.toArray(new String[0]);
+ TypeInfo[] typeInfoArray = typeInfos.toArray(new TypeInfo[0]);
+
+ VectorizedRowBatchCtx vectorizedRowBatchCtx =
+ new VectorizedRowBatchCtx(
+ columnNameArray,
+ typeInfoArray,
+ partitionColumnCount,
+ scratchTypeNameArray);
+ baseWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx);
+ }
+ }
+
class VectorizationDispatcher implements Dispatcher {
- private List<String> reduceColumnNames;
- private List<TypeInfo> reduceTypeInfos;
+ private final PhysicalContext physicalContext;
public VectorizationDispatcher(PhysicalContext physicalContext) {
- reduceColumnNames = null;
- reduceTypeInfos = null;
+ this.physicalContext = physicalContext;
}
@Override
@@ -369,9 +412,10 @@ public class Vectorizer implements PhysicalPlanResolver {
}
private void convertMapWork(MapWork mapWork, boolean isTez) throws
SemanticException {
- boolean ret = validateMapWork(mapWork, isTez);
+ VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
+ boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTez);
if (ret) {
- vectorizeMapWork(mapWork, isTez);
+ vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTez);
}
}
@@ -382,40 +426,262 @@ public class Vectorizer implements PhysicalPlanResolver {
+ ReduceSinkOperator.getOperatorName()), np);
}
- private boolean validateMapWork(MapWork mapWork, boolean isTez) throws
SemanticException {
- LOG.info("Validating MapWork...");
+ private ImmutablePair<String, TableScanOperator>
verifyOnlyOneTableScanOperator(MapWork mapWork) {
// Eliminate MR plans with more than one TableScanOperator.
+
LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork =
mapWork.getAliasToWork();
if ((aliasToWork == null) || (aliasToWork.size() == 0)) {
- return false;
+ return null;
}
int tableScanCount = 0;
- for (Operator<?> op : aliasToWork.values()) {
+ String alias = "";
+ TableScanOperator tableScanOperator = null;
+ for (Entry<String, Operator<? extends OperatorDesc>> entry :
aliasToWork.entrySet()) {
+ Operator<?> op = entry.getValue();
if (op == null) {
LOG.warn("Map work has invalid aliases to work with. Fail
validation!");
- return false;
+ return null;
}
if (op instanceof TableScanOperator) {
tableScanCount++;
+ alias = entry.getKey();
+ tableScanOperator = (TableScanOperator) op;
}
}
if (tableScanCount > 1) {
- LOG.warn("Map work has more than 1 TableScanOperator aliases to work
with. Fail validation!");
- return false;
+ LOG.warn("Map work has more than 1 TableScanOperator. Fail
validation!");
+ return null;
+ }
+ return new ImmutablePair(alias, tableScanOperator);
+ }
+
+ private void getTableScanOperatorSchemaInfo(TableScanOperator
tableScanOperator,
+ List<String> logicalColumnNameList, List<TypeInfo>
logicalTypeInfoList) {
+
+ TableScanDesc tableScanDesc = tableScanOperator.getConf();
+
+ // Add all non-virtual columns to make a vectorization context for
+ // the TableScan operator.
+ RowSchema rowSchema = tableScanOperator.getSchema();
+ for (ColumnInfo c : rowSchema.getSignature()) {
+ // Validation will later exclude vectorization of virtual columns
usage (HIVE-5560).
+ if (!isVirtualColumn(c)) {
+ String columnName = c.getInternalName();
+ String typeName = c.getTypeName();
+ TypeInfo typeInfo =
TypeInfoUtils.getTypeInfoFromTypeString(typeName);
+
+ logicalColumnNameList.add(columnName);
+ logicalTypeInfoList.add(typeInfo);
+ }
+ }
+ }
+
+ private String getColumns(List<String> columnNames, int start, int length,
+ Character separator) {
+ return Joiner.on(separator).join(columnNames.subList(start, start +
length));
+ }
+
+ private String getTypes(List<TypeInfo> typeInfos, int start, int length) {
+ return TypeInfoUtils.getTypesString(typeInfos.subList(start, start +
length));
+ }
+
+ private boolean verifyAndSetVectorPartDesc(PartitionDesc pd) {
+
+ // Look for Pass-Thru case where InputFileFormat has
VectorizedInputFormatInterface
+ // and reads VectorizedRowBatch as a "row".
+
+ if (Utilities.isInputFileFormatVectorized(pd)) {
+
+
pd.setVectorPartitionDesc(VectorPartitionDesc.createVectorizedInputFileFormat());
+
+ return true;
}
+ LOG.info("Input format: " + pd.getInputFileFormatClassName()
+ + ", doesn't provide vectorized input");
+
+ return false;
+ }
+
+ private boolean validateInputFormatAndSchemaEvolution(MapWork mapWork,
String alias,
+ TableScanOperator tableScanOperator, VectorTaskColumnInfo
vectorTaskColumnInfo) {
+
+ // These names/types are the data columns plus partition columns.
+ final List<String> allColumnNameList = new ArrayList<String>();
+ final List<TypeInfo> allTypeInfoList = new ArrayList<TypeInfo>();
+
+ getTableScanOperatorSchemaInfo(tableScanOperator, allColumnNameList,
allTypeInfoList);
+ final int allColumnCount = allColumnNameList.size();
+
+ // Validate input format and schema evolution capability.
+
+ // For the table, enter a null value in the multi-key map indicating no
conversion necessary
+ // if the schema matches the table.
+
+ HashMap<ImmutablePair, boolean[]> conversionMap = new
HashMap<ImmutablePair, boolean[]>();
+
+ boolean isFirst = true;
+ int dataColumnCount = 0;
+ int partitionColumnCount = 0;
+
+ List<String> dataColumnList = null;
+ String dataColumnsString = "";
+ List<TypeInfo> dataTypeInfoList = null;
+
// Validate the input format
- for (String path : mapWork.getPathToPartitionInfo().keySet()) {
- PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path);
- List<Class<?>> interfaceList =
- Arrays.asList(pd.getInputFileFormatClass().getInterfaces());
- if (!interfaceList.contains(VectorizedInputFormatInterface.class)) {
- LOG.info("Input format: " + pd.getInputFileFormatClassName()
- + ", doesn't provide vectorized input");
+ VectorPartitionConversion partitionConversion = new
VectorPartitionConversion();
+ LinkedHashMap<String, ArrayList<String>> pathToAliases =
mapWork.getPathToAliases();
+ LinkedHashMap<String, PartitionDesc> pathToPartitionInfo =
mapWork.getPathToPartitionInfo();
+ for (Entry<String, ArrayList<String>> entry: pathToAliases.entrySet()) {
+ String path = entry.getKey();
+ List<String> aliases = entry.getValue();
+ boolean isPresent = (aliases != null && aliases.indexOf(alias) != -1);
+ if (!isPresent) {
+ LOG.info("Alias " + alias + " not present in aliases " + aliases);
return false;
}
+ PartitionDesc partDesc = pathToPartitionInfo.get(path);
+ if (partDesc.getVectorPartitionDesc() != null) {
+ // We seen this already.
+ continue;
+ }
+ if (!verifyAndSetVectorPartDesc(partDesc)) {
+ return false;
+ }
+ VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+ LOG.info("Vectorizer path: " + path + ", read type " +
+ vectorPartDesc.getVectorMapOperatorReadType().name() + ", aliases
" + aliases);
+
+ Properties partProps = partDesc.getProperties();
+
+ String nextDataColumnsString =
+ partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
+ String[] nextDataColumns = nextDataColumnsString.split(",");
+
+ String nextDataTypesString =
+
partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
+
+ // We convert to an array of TypeInfo using a library routine since it
parses the information
+ // and can handle use of different separators, etc. We cannot use the
raw type string
+ // for comparison in the map because of the different separators used.
+ List<TypeInfo> nextDataTypeInfoList =
+ TypeInfoUtils.getTypeInfosFromTypeString(nextDataTypesString);
+
+ if (isFirst) {
+
+ // We establish with the first one whether the table is partitioned
or not.
+
+ LinkedHashMap<String, String> partSpec = partDesc.getPartSpec();
+ if (partSpec != null && partSpec.size() > 0) {
+ partitionColumnCount = partSpec.size();
+ dataColumnCount = allColumnCount - partitionColumnCount;
+ } else {
+ partitionColumnCount = 0;
+ dataColumnCount = allColumnCount;
+ }
+
+ dataColumnList = allColumnNameList.subList(0, dataColumnCount);
+ dataColumnsString = getColumns(allColumnNameList, 0,
dataColumnCount, ',');
+ dataTypeInfoList = allTypeInfoList.subList(0, dataColumnCount);
+
+ // Add the table (non-partitioned) columns and types into the map as
not needing
+ // conversion (i.e. null).
+ conversionMap.put(
+ new ImmutablePair(dataColumnsString, dataTypeInfoList), null);
+
+ isFirst = false;
+ }
+
+ ImmutablePair columnNamesAndTypesCombination =
+ new ImmutablePair(nextDataColumnsString, nextDataTypeInfoList);
+
+ boolean[] conversionFlags;
+ if (conversionMap.containsKey(columnNamesAndTypesCombination)) {
+
+ conversionFlags = conversionMap.get(columnNamesAndTypesCombination);
+
+ } else {
+
+ List<String> nextDataColumnList = Arrays.asList(nextDataColumns);
+
+ // Validate the column names that are present are the same. Missing
columns will be
+ // implicitly defaulted to null.
+
+ if (nextDataColumnList.size() > dataColumnList.size()) {
+ LOG.info(
+ String.format("Could not vectorize partition %s. The
partition column names %d is greater than the number of table columns %d",
+ path, nextDataColumnList.size(), dataColumnList.size()));
+ return false;
+ }
+ for (int i = 0; i < nextDataColumnList.size(); i++) {
+ String nextColumnName = nextDataColumnList.get(i);
+ String tableColumnName = dataColumnList.get(i);
+ if (!nextColumnName.equals(tableColumnName)) {
+ LOG.info(
+ String.format("Could not vectorize partition %s. The
partition column name %s is does not match table column name %s",
+ path, nextColumnName, tableColumnName));
+ return false;
+ }
+ }
+
+ // The table column types might have been changed with ALTER. There
are restrictions
+ // here for vectorization.
+
+ // Some readers / deserializers take responsibility for conversion
themselves.
+
+ // If we need to check for conversion, the conversion object may
come back null
+ // indicating from a vectorization point of view the conversion is
implicit. That is,
+ // all implicit integer upgrades.
+
+ if (vectorPartDesc.getNeedsDataTypeConversionCheck() &&
+ !nextDataTypeInfoList.equals(dataTypeInfoList)) {
+
+ // The results will be in 2 members: validConversion and
conversionFlags
+ partitionConversion.validateConversion(nextDataTypeInfoList,
dataTypeInfoList);
+ if (!partitionConversion.getValidConversion()) {
+ return false;
+ }
+ conversionFlags = partitionConversion.getResultConversionFlags();
+ } else {
+ conversionFlags = null;
+ }
+
+ // We enter this in our map so we don't have to check again for
subsequent partitions.
+
+ conversionMap.put(columnNamesAndTypesCombination, conversionFlags);
+ }
+
+ vectorPartDesc.setConversionFlags(conversionFlags);
+
+ vectorPartDesc.setTypeInfos(nextDataTypeInfoList);
+ }
+
+ vectorTaskColumnInfo.setColumnNames(allColumnNameList);
+ vectorTaskColumnInfo.setTypeInfos(allTypeInfoList);
+ vectorTaskColumnInfo.setPartitionColumnCount(partitionColumnCount);
+
+ return true;
+ }
+
+ private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo
vectorTaskColumnInfo, boolean isTez)
+ throws SemanticException {
+
+ LOG.info("Validating MapWork...");
+
+ ImmutablePair<String,TableScanOperator> pair =
verifyOnlyOneTableScanOperator(mapWork);
+ if (pair == null) {
+ return false;
}
+ String alias = pair.left;
+ TableScanOperator tableScanOperator = pair.right;
+
+ // This call fills in the column names, types, and partition column
count in
+ // vectorTaskColumnInfo.
+ if (!validateInputFormatAndSchemaEvolution(mapWork, alias,
tableScanOperator, vectorTaskColumnInfo)) {
+ return false;
+ }
+
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule,
NodeProcessor>();
MapWorkValidationNodeProcessor vnp = new
MapWorkValidationNodeProcessor(mapWork, isTez);
addMapWorkRules(opRules, vnp);
@@ -437,11 +703,14 @@ public class Vectorizer implements PhysicalPlanResolver {
return true;
}
- private void vectorizeMapWork(MapWork mapWork, boolean isTez) throws
SemanticException {
+ private void vectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo
vectorTaskColumnInfo,
+ boolean isTez) throws SemanticException {
+
LOG.info("Vectorizing MapWork...");
mapWork.setVectorMode(true);
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule,
NodeProcessor>();
- MapWorkVectorizationNodeProcessor vnp = new
MapWorkVectorizationNodeProcessor(mapWork, isTez);
+ MapWorkVectorizationNodeProcessor vnp =
+ new MapWorkVectorizationNodeProcessor(mapWork, isTez,
vectorTaskColumnInfo);
addMapWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new PreOrderOnceWalker(disp);
@@ -451,9 +720,9 @@ public class Vectorizer implements PhysicalPlanResolver {
HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
ogw.startWalking(topNodes, nodeOutput);
- mapWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
- mapWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
-
mapWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
+
vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames());
+
+ vectorTaskColumnInfo.transferToBaseWork(mapWork);
if (LOG.isDebugEnabled()) {
debugDisplayAllMaps(mapWork);
@@ -463,13 +732,19 @@ public class Vectorizer implements PhysicalPlanResolver {
}
private void convertReduceWork(ReduceWork reduceWork, boolean isTez)
throws SemanticException {
- boolean ret = validateReduceWork(reduceWork);
+ VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
+ boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo,
isTez);
if (ret) {
- vectorizeReduceWork(reduceWork, isTez);
+ vectorizeReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
}
}
- private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork)
throws SemanticException {
+ private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork,
+ VectorTaskColumnInfo vectorTaskColumnInfo) throws
SemanticException {
+
+ ArrayList<String> reduceColumnNames = new ArrayList<String>();
+ ArrayList<TypeInfo> reduceTypeInfos = new ArrayList<TypeInfo>();
+
try {
// Check key ObjectInspector.
ObjectInspector keyObjectInspector =
reduceWork.getKeyObjectInspector();
@@ -493,9 +768,6 @@ public class Vectorizer implements PhysicalPlanResolver {
StructObjectInspector valueStructObjectInspector =
(StructObjectInspector)valueObjectInspector;
List<? extends StructField> valueFields =
valueStructObjectInspector.getAllStructFieldRefs();
- reduceColumnNames = new ArrayList<String>();
- reduceTypeInfos = new ArrayList<TypeInfo>();
-
for (StructField field: keyFields) {
reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." +
field.getFieldName());
reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
@@ -507,6 +779,10 @@ public class Vectorizer implements PhysicalPlanResolver {
} catch (Exception e) {
throw new SemanticException(e);
}
+
+ vectorTaskColumnInfo.setColumnNames(reduceColumnNames);
+ vectorTaskColumnInfo.setTypeInfos(reduceTypeInfos);
+
return true;
}
@@ -515,11 +791,13 @@ public class Vectorizer implements PhysicalPlanResolver {
opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() +
".*"), np);
}
- private boolean validateReduceWork(ReduceWork reduceWork) throws
SemanticException {
+ private boolean validateReduceWork(ReduceWork reduceWork,
+ VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws
SemanticException {
+
LOG.info("Validating ReduceWork...");
// Validate input to ReduceWork.
- if (!getOnlyStructObjectInspectors(reduceWork)) {
+ if (!getOnlyStructObjectInspectors(reduceWork, vectorTaskColumnInfo)) {
return false;
}
// Now check the reduce operator tree.
@@ -543,7 +821,9 @@ public class Vectorizer implements PhysicalPlanResolver {
return true;
}
- private void vectorizeReduceWork(ReduceWork reduceWork, boolean isTez)
throws SemanticException {
+ private void vectorizeReduceWork(ReduceWork reduceWork,
+ VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws
SemanticException {
+
LOG.info("Vectorizing ReduceWork...");
reduceWork.setVectorMode(true);
@@ -552,7 +832,7 @@ public class Vectorizer implements PhysicalPlanResolver {
// VectorizationContext... Do we use PreOrderWalker instead of
DefaultGraphWalker.
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule,
NodeProcessor>();
ReduceWorkVectorizationNodeProcessor vnp =
- new ReduceWorkVectorizationNodeProcessor(reduceColumnNames,
reduceTypeInfos, isTez);
+ new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo,
isTez);
addReduceWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new PreOrderWalker(disp);
@@ -567,9 +847,9 @@ public class Vectorizer implements PhysicalPlanResolver {
// Necessary since we are vectorizing the root operator in reduce.
reduceWork.setReducer(vnp.getRootVectorOp());
- reduceWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
- reduceWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
-
reduceWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
+
vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames());
+
+ vectorTaskColumnInfo.transferToBaseWork(reduceWork);
if (LOG.isDebugEnabled()) {
debugDisplayAllMaps(reduceWork);
@@ -637,23 +917,11 @@ public class Vectorizer implements PhysicalPlanResolver {
// The vectorization context for the Map or Reduce task.
protected VectorizationContext taskVectorizationContext;
- // The input projection column type name map for the Map or Reduce task.
- protected Map<Integer, String> taskColumnTypeNameMap;
-
VectorizationNodeProcessor() {
- taskColumnTypeNameMap = new HashMap<Integer, String>();
- }
-
- public Map<String, Integer> getVectorColumnNameMap() {
- return taskVectorizationContext.getProjectionColumnMap();
- }
-
- public Map<Integer, String> getVectorColumnTypeMap() {
- return taskColumnTypeNameMap;
}
- public Map<Integer, String> getVectorScratchColumnTypeMap() {
- return taskVectorizationContext.getScratchColumnTypeMap();
+ public String[] getVectorScratchColumnTypeNames() {
+ return taskVectorizationContext.getScratchColumnTypeNames();
}
protected final Set<Operator<? extends OperatorDesc>> opsDone =
@@ -722,10 +990,15 @@ public class Vectorizer implements PhysicalPlanResolver {
class MapWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
+ private final MapWork mWork;
+ private VectorTaskColumnInfo vectorTaskColumnInfo;
private final boolean isTez;
- public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez) {
+ public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez,
+ VectorTaskColumnInfo vectorTaskColumnInfo) {
super();
+ this.mWork = mWork;
+ this.vectorTaskColumnInfo = vectorTaskColumnInfo;
this.isTez = isTez;
}
@@ -739,8 +1012,7 @@ public class Vectorizer implements PhysicalPlanResolver {
if (op instanceof TableScanOperator) {
if (taskVectorizationContext == null) {
- taskVectorizationContext = getVectorizationContext(op.getSchema(),
op.getName(),
- taskColumnTypeNameMap);
+ taskVectorizationContext = getVectorizationContext(op.getName(),
vectorTaskColumnInfo);
}
vContext = taskVectorizationContext;
} else {
@@ -784,10 +1056,9 @@ public class Vectorizer implements PhysicalPlanResolver {
class ReduceWorkVectorizationNodeProcessor extends
VectorizationNodeProcessor {
- private final List<String> reduceColumnNames;
- private final List<TypeInfo> reduceTypeInfos;
+ private VectorTaskColumnInfo vectorTaskColumnInfo;
- private final boolean isTez;
+ private boolean isTez;
private Operator<? extends OperatorDesc> rootVectorOp;
@@ -795,11 +1066,11 @@ public class Vectorizer implements PhysicalPlanResolver {
return rootVectorOp;
}
- public ReduceWorkVectorizationNodeProcessor(List<String> reduceColumnNames,
- List<TypeInfo> reduceTypeInfos, boolean isTez) {
+ public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo
vectorTaskColumnInfo,
+ boolean isTez) {
+
super();
- this.reduceColumnNames = reduceColumnNames;
- this.reduceTypeInfos = reduceTypeInfos;
+ this.vectorTaskColumnInfo = vectorTaskColumnInfo;
rootVectorOp = null;
this.isTez = isTez;
}
@@ -815,15 +1086,11 @@ public class Vectorizer implements PhysicalPlanResolver {
boolean saveRootVectorOp = false;
if (op.getParentOperators().size() == 0) {
- LOG.info("ReduceWorkVectorizationNodeProcessor process
reduceColumnNames " + reduceColumnNames.toString());
+ LOG.info("ReduceWorkVectorizationNodeProcessor process
reduceColumnNames " + vectorTaskColumnInfo.columnNames.toString());
- vContext = new VectorizationContext("__Reduce_Shuffle__",
reduceColumnNames);
+ vContext = new VectorizationContext("__Reduce_Shuffle__",
vectorTaskColumnInfo.columnNames);
taskVectorizationContext = vContext;
- int i = 0;
- for (TypeInfo typeInfo : reduceTypeInfos) {
- taskColumnTypeNameMap.put(i, typeInfo.getTypeName());
- i++;
- }
+
saveRootVectorOp = true;
if (LOG.isDebugEnabled()) {
@@ -887,6 +1154,7 @@ public class Vectorizer implements PhysicalPlanResolver {
@Override
public PhysicalContext resolve(PhysicalContext physicalContext) throws
SemanticException {
+
hiveConf = physicalContext.getConf();
boolean vectorPath = HiveConf.getBoolVar(hiveConf,
@@ -1026,65 +1294,6 @@ public class Vectorizer implements PhysicalPlanResolver {
return false;
}
- String columns = "";
- String types = "";
- String partitionColumns = "";
- String partitionTypes = "";
- boolean haveInfo = false;
-
- // This over-reaches slightly, since we can have > 1 table-scan per
map-work.
- // It needs path to partition, path to alias, then check the alias == the
same table-scan, to be accurate.
- // That said, that is a TODO item to be fixed when we support >1
TableScans per vectorized pipeline later.
- LinkedHashMap<String, PartitionDesc> partitionDescs =
mWork.getPathToPartitionInfo();
-
- // For vectorization, compare each partition information for against the
others.
- // We assume the table information will be from one of the partitions, so
it will
- // work to focus on the partition information and not compare against the
TableScanOperator
- // columns (in the VectorizationContext)....
- for (Map.Entry<String, PartitionDesc> entry : partitionDescs.entrySet()) {
- PartitionDesc partDesc = entry.getValue();
- if (partDesc.getPartSpec() == null || partDesc.getPartSpec().isEmpty()) {
- // No partition information -- we match because we would default to
using the table description.
- continue;
- }
- Properties partProps = partDesc.getProperties();
- if (!haveInfo) {
- columns =
partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
- types =
partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
- partitionColumns =
partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
- partitionTypes =
partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- haveInfo = true;
- } else {
- String nextColumns =
partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
- String nextTypes =
partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
- String nextPartitionColumns =
partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
- String nextPartitionTypes =
partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- if (!columns.equalsIgnoreCase(nextColumns)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its column
names %s do not match the other column names %s",
- entry.getKey(), nextColumns, columns));
- return false;
- }
- if (!types.equalsIgnoreCase(nextTypes)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its column
types %s do not match the other column types %s",
- entry.getKey(), nextTypes, types));
- return false;
- }
- if (!partitionColumns.equalsIgnoreCase(nextPartitionColumns)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its
partition column names %s do not match the other partition column names %s",
- entry.getKey(), nextPartitionColumns,
partitionColumns));
- return false;
- }
- if (!partitionTypes.equalsIgnoreCase(nextPartitionTypes)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its
partition column types %s do not match the other partition column types %s",
- entry.getKey(), nextPartitionTypes,
partitionTypes));
- return false;
- }
- }
- }
return true;
}
@@ -1440,23 +1649,10 @@ public class Vectorizer implements PhysicalPlanResolver
{
return result;
}
- private VectorizationContext getVectorizationContext(RowSchema rowSchema,
String contextName,
- Map<Integer, String> typeNameMap) {
+ private VectorizationContext getVectorizationContext(String contextName,
+ VectorTaskColumnInfo vectorTaskColumnInfo) {
- VectorizationContext vContext = new VectorizationContext(contextName);
-
- // Add all non-virtual columns to make a vectorization context for
- // the TableScan operator.
- int i = 0;
- for (ColumnInfo c : rowSchema.getSignature()) {
- // Earlier, validation code should have eliminated virtual columns usage
(HIVE-5560).
- if (!isVirtualColumn(c)) {
- vContext.addInitialColumn(c.getInternalName());
- typeNameMap.put(i, c.getTypeName());
- i++;
- }
- }
- vContext.finishedAddingInitialColumns();
+ VectorizationContext vContext = new VectorizationContext(contextName,
vectorTaskColumnInfo.columnNames);
return vContext;
}
@@ -2011,12 +2207,16 @@ public class Vectorizer implements PhysicalPlanResolver
{
public void debugDisplayAllMaps(BaseWork work) {
- Map<String, Integer> columnNameMap = work.getVectorColumnNameMap();
- Map<Integer, String> columnTypeMap = work.getVectorColumnTypeMap();
- Map<Integer, String> scratchColumnTypeMap =
work.getVectorScratchColumnTypeMap();
+ VectorizedRowBatchCtx vectorizedRowBatchCtx =
work.getVectorizedRowBatchCtx();
+
+ String[] columnNames = vectorizedRowBatchCtx.getRowColumnNames();
+ Object columnTypeInfos = vectorizedRowBatchCtx.getRowColumnTypeInfos();
+ int partitionColumnCount = vectorizedRowBatchCtx.getPartitionColumnCount();
+ String[] scratchColumnTypeNames
=vectorizedRowBatchCtx.getScratchColumnTypeNames();
- LOG.debug("debugDisplayAllMaps columnNameMap " + columnNameMap.toString());
- LOG.debug("debugDisplayAllMaps columnTypeMap " + columnTypeMap.toString());
- LOG.debug("debugDisplayAllMaps scratchColumnTypeMap " +
scratchColumnTypeMap.toString());
+ LOG.debug("debugDisplayAllMaps columnNames " +
Arrays.toString(columnNames));
+ LOG.debug("debugDisplayAllMaps columnTypeInfos " +
Arrays.deepToString((Object[]) columnTypeInfos));
+ LOG.debug("debugDisplayAllMaps partitionColumnCount " +
partitionColumnCount);
+ LOG.debug("debugDisplayAllMaps scratchColumnTypeNames " +
Arrays.toString(scratchColumnTypeNames));
}
}