HIVE-11468: Vectorize Struct IN() clauses (Matt McCline, via Gopal V)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7cfe3743 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7cfe3743 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7cfe3743 Branch: refs/heads/beeline-cli Commit: 7cfe3743ff583386653bdd32c79f2c44ffe734ba Parents: 2e8324e Author: Gopal V <[email protected]> Authored: Tue Sep 22 19:39:49 2015 -0700 Committer: Gopal V <[email protected]> Committed: Tue Sep 22 23:24:14 2015 -0700 ---------------------------------------------------------------------- .../ql/exec/vector/VectorizationContext.java | 203 +- .../expressions/FilterStringColumnInList.java | 13 +- .../expressions/FilterStructColumnInList.java | 178 ++ .../exec/vector/expressions/IStructInExpr.java | 36 + .../vector/expressions/StringColumnInList.java | 4 + .../vector/expressions/StructColumnInList.java | 174 ++ .../hive/ql/optimizer/physical/Vectorizer.java | 71 +- .../ql/optimizer/physical/Vectorizer.java.orig | 1744 ++++++++++++++++++ .../ql/optimizer/physical/Vectorizer.java.rej | 86 + .../queries/clientpositive/vector_struct_in.q | 247 +++ .../clientpositive/vector_struct_in.q.out | 825 +++++++++ 11 files changed, 3566 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index 2483196..46c2a78 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -104,20 +104,30 @@ import org.apache.hadoop.hive.ql.udf.UDFToLong; import org.apache.hadoop.hive.ql.udf.UDFToShort; import org.apache.hadoop.hive.ql.udf.UDFToString; import org.apache.hadoop.hive.ql.udf.generic.*; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.DateUtils; + /** * Context class for vectorization execution. * Main role is to map column names to column indices and serves as a @@ -1273,17 +1283,208 @@ public class VectorizationContext { } } + public enum InConstantType { + INT_FAMILY, + TIMESTAMP, + DATE, + FLOAT_FAMILY, + STRING_FAMILY, + DECIMAL + } + + public static InConstantType getInConstantTypeFromPrimitiveCategory(PrimitiveCategory primitiveCategory) { + + switch (primitiveCategory) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + return InConstantType.INT_FAMILY; + + case DATE: + return InConstantType.TIMESTAMP; + + case TIMESTAMP: + return InConstantType.DATE; + + case FLOAT: + case DOUBLE: + return InConstantType.FLOAT_FAMILY; + + case STRING: + case CHAR: + case VARCHAR: + case BINARY: + return InConstantType.STRING_FAMILY; + + case DECIMAL: + return InConstantType.DECIMAL; + + + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + // UNDONE: Fall through for these... they don't appear to be supported yet. + default: + throw new RuntimeException("Unexpected primitive type category " + primitiveCategory); + } + } + + private VectorExpression getStructInExpression(List<ExprNodeDesc> childExpr, ExprNodeDesc colExpr, + TypeInfo colTypeInfo, List<ExprNodeDesc> inChildren, Mode mode, TypeInfo returnType) + throws HiveException { + + VectorExpression expr = null; + + StructTypeInfo structTypeInfo = (StructTypeInfo) colTypeInfo; + + ArrayList<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + final int fieldCount = fieldTypeInfos.size(); + ColumnVector.Type[] fieldVectorColumnTypes = new ColumnVector.Type[fieldCount]; + InConstantType[] fieldInConstantTypes = new InConstantType[fieldCount]; + for (int f = 0; f < fieldCount; f++) { + TypeInfo fieldTypeInfo = fieldTypeInfos.get(f); + // Only primitive fields supports for now. + if (fieldTypeInfo.getCategory() != Category.PRIMITIVE) { + return null; + } + + // We are going to serialize using the 4 basic types. + ColumnVector.Type fieldVectorColumnType = getColumnVectorTypeFromTypeInfo(fieldTypeInfo); + fieldVectorColumnTypes[f] = fieldVectorColumnType; + + // We currently evaluate the IN (..) constants in special ways. + PrimitiveCategory fieldPrimitiveCategory = + ((PrimitiveTypeInfo) fieldTypeInfo).getPrimitiveCategory(); + InConstantType inConstantType = getInConstantTypeFromPrimitiveCategory(fieldPrimitiveCategory); + fieldInConstantTypes[f] = inConstantType; + } + + Output buffer = new Output(); + BinarySortableSerializeWrite binarySortableSerializeWrite = + new BinarySortableSerializeWrite(fieldCount); + + final int inChildrenCount = inChildren.size(); + byte[][] serializedInChildren = new byte[inChildrenCount][]; + try { + for (int i = 0; i < inChildrenCount; i++) { + final ExprNodeDesc node = inChildren.get(i); + final Object[] constants; + + if (node instanceof ExprNodeConstantDesc) { + ExprNodeConstantDesc constNode = (ExprNodeConstantDesc) node; + ConstantObjectInspector output = constNode.getWritableObjectInspector(); + constants = ((List<?>) output.getWritableConstantValue()).toArray(); + } else { + ExprNodeGenericFuncDesc exprNode = (ExprNodeGenericFuncDesc) node; + ExprNodeEvaluator<?> evaluator = ExprNodeEvaluatorFactory + .get(exprNode); + ObjectInspector output = evaluator.initialize(exprNode + .getWritableObjectInspector()); + constants = (Object[]) evaluator.evaluate(null); + } + + binarySortableSerializeWrite.set(buffer); + for (int f = 0; f < fieldCount; f++) { + Object constant = constants[f]; + if (constant == null) { + binarySortableSerializeWrite.writeNull(); + } else { + InConstantType inConstantType = fieldInConstantTypes[f]; + switch (inConstantType) { + case STRING_FAMILY: + { + byte[] bytes; + if (constant instanceof Text) { + Text text = (Text) constant; + bytes = text.getBytes(); + binarySortableSerializeWrite.writeString(bytes, 0, text.getLength()); + } else { + throw new HiveException("Unexpected constant String type " + + constant.getClass().getSimpleName()); + } + } + break; + case INT_FAMILY: + { + long value; + if (constant instanceof IntWritable) { + value = ((IntWritable) constant).get(); + } else if (constant instanceof LongWritable) { + value = ((LongWritable) constant).get(); + } else { + throw new HiveException("Unexpected constant Long type " + + constant.getClass().getSimpleName()); + } + binarySortableSerializeWrite.writeLong(value); + } + break; + + case FLOAT_FAMILY: + { + double value; + if (constant instanceof DoubleWritable) { + value = ((DoubleWritable) constant).get(); + } else { + throw new HiveException("Unexpected constant Double type " + + constant.getClass().getSimpleName()); + } + binarySortableSerializeWrite.writeDouble(value); + } + break; + + // UNDONE... + case DATE: + case TIMESTAMP: + case DECIMAL: + default: + throw new RuntimeException("Unexpected IN constant type " + inConstantType.name()); + } + } + } + serializedInChildren[i] = Arrays.copyOfRange(buffer.getData(), 0, buffer.getLength()); + } + } catch (Exception e) { + throw new HiveException(e); + } + + // Create a single child representing the scratch column where we will + // generate the serialized keys of the batch. + int scratchBytesCol = ocm.allocateOutputColumn("string"); + + Class<?> cl = (mode == Mode.FILTER ? FilterStructColumnInList.class : StructColumnInList.class); + + expr = createVectorExpression(cl, null, Mode.PROJECTION, returnType); + + ((IStringInExpr) expr).setInListValues(serializedInChildren); + + ((IStructInExpr) expr).setScratchBytesColumn(scratchBytesCol); + ((IStructInExpr) expr).setStructColumnExprs(this, colExpr.getChildren(), + fieldVectorColumnTypes); + + return expr; + } + /** * Create a filter or boolean-valued expression for column IN ( <list-of-constants> ) */ private VectorExpression getInExpression(List<ExprNodeDesc> childExpr, Mode mode, TypeInfo returnType) throws HiveException { ExprNodeDesc colExpr = childExpr.get(0); + List<ExprNodeDesc> inChildren = childExpr.subList(1, childExpr.size()); String colType = colExpr.getTypeString(); + colType = VectorizationContext.mapTypeNameSynonyms(colType); + TypeInfo colTypeInfo = TypeInfoUtils.getTypeInfoFromTypeString(colType); + Category category = colTypeInfo.getCategory(); + if (category == Category.STRUCT){ + return getStructInExpression(childExpr, colExpr, colTypeInfo, inChildren, mode, returnType); + } else if (category != Category.PRIMITIVE) { + return null; + } // prepare arguments for createVectorExpression - List<ExprNodeDesc> childrenForInList = evaluateCastOnConstants(childExpr.subList(1, childExpr.size())); + List<ExprNodeDesc> childrenForInList = evaluateCastOnConstants(inChildren); /* This method assumes that the IN list has no NULL entries. That is enforced elsewhere, * in the Vectorizer class. If NULL is passed in as a list entry, behavior is not defined. http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java index 2434e90..e34ec75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColumnInList.java @@ -20,16 +20,7 @@ package org.apache.hadoop.hive.ql.exec.vector.expressions; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.udf.UDFLike; -import org.apache.hadoop.io.Text; - -import java.util.Arrays; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * Evaluate an IN filter on a batch for a vector of strings. @@ -165,6 +156,10 @@ public class FilterStringColumnInList extends VectorExpression implements IStrin return "boolean"; } + public void setInputColumn(int inputCol) { + this.inputCol = inputCol; + } + @Override public int getOutputColumn() { return -1; http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java new file mode 100644 index 0000000..00f22bb --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStructColumnInList.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions; + +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; + +/** + * Evaluate an IN filter on a batch for a vector of structs. + * This is optimized so that no objects have to be created in + * the inner loop, and there is a hash table implemented + * with Cuckoo hashing that has fast lookup to do the IN test. + */ +public class FilterStructColumnInList extends FilterStringColumnInList implements IStructInExpr { + private static final long serialVersionUID = 1L; + private VectorExpression[] structExpressions; + private ColumnVector.Type[] fieldVectorColumnTypes; + private int[] structColumnMap; + private int scratchBytesColumn; + + private transient Output buffer; + private transient BinarySortableSerializeWrite binarySortableSerializeWrite; + + /** + * After construction you must call setInListValues() to add the values to the IN set + * (on the IStringInExpr interface). + * + * And, call a and b on the IStructInExpr interface. + */ + public FilterStructColumnInList() { + super(-1); + } + + @Override + public void evaluate(VectorizedRowBatch batch) { + + final int logicalSize = batch.size; + if (logicalSize == 0) { + return; + } + + if (buffer == null) { + buffer = new Output(); + binarySortableSerializeWrite = new BinarySortableSerializeWrite(structColumnMap.length); + } + + for (VectorExpression ve : structExpressions) { + ve.evaluate(batch); + } + + BytesColumnVector scratchBytesColumnVector = (BytesColumnVector) batch.cols[scratchBytesColumn]; + + try { + boolean selectedInUse = batch.selectedInUse; + int[] selected = batch.selected; + for (int logical = 0; logical < logicalSize; logical++) { + int batchIndex = (selectedInUse ? selected[logical] : logical); + + binarySortableSerializeWrite.set(buffer); + for (int f = 0; f < structColumnMap.length; f++) { + int fieldColumn = structColumnMap[f]; + ColumnVector colVec = batch.cols[fieldColumn]; + int adjustedIndex = (colVec.isRepeating ? 0 : batchIndex); + if (colVec.noNulls || !colVec.isNull[adjustedIndex]) { + switch (fieldVectorColumnTypes[f]) { + case BYTES: + { + BytesColumnVector bytesColVec = (BytesColumnVector) colVec; + byte[] bytes = bytesColVec.vector[adjustedIndex]; + int start = bytesColVec.start[adjustedIndex]; + int length = bytesColVec.length[adjustedIndex]; + binarySortableSerializeWrite.writeString(bytes, start, length); + } + break; + + case LONG: + binarySortableSerializeWrite.writeLong(((LongColumnVector) colVec).vector[adjustedIndex]); + break; + + case DOUBLE: + binarySortableSerializeWrite.writeDouble(((DoubleColumnVector) colVec).vector[adjustedIndex]); + break; + + case DECIMAL: + binarySortableSerializeWrite.writeHiveDecimal( + ((DecimalColumnVector) colVec).vector[adjustedIndex].getHiveDecimal()); + break; + + default: + throw new RuntimeException("Unexpected vector column type " + + fieldVectorColumnTypes[f].name()); + } + } else { + binarySortableSerializeWrite.writeNull(); + } + } + scratchBytesColumnVector.setVal(batchIndex, buffer.getData(), 0, buffer.getLength()); + } + + // Now, take the serialized keys we just wrote into our scratch column and look them + // up in the IN list. + super.evaluate(batch); + + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + + @Override + public String getOutputType() { + return "boolean"; + } + + @Override + public int getOutputColumn() { + return -1; + } + + @Override + public Descriptor getDescriptor() { + + // This VectorExpression (IN) is a special case, so don't return a descriptor. + return null; + } + + @Override + public void setScratchBytesColumn(int scratchBytesColumn) { + + // Tell our super class FilterStringColumnInList it will be evaluating our scratch + // BytesColumnVector. + super.setInputColumn(scratchBytesColumn); + this.scratchBytesColumn = scratchBytesColumn; + } + + @Override + public void setStructColumnExprs(VectorizationContext vContext, + List<ExprNodeDesc> structColumnExprs, ColumnVector.Type[] fieldVectorColumnTypes) + throws HiveException { + + structExpressions = vContext.getVectorExpressions(structColumnExprs); + structColumnMap = new int[structExpressions.length]; + for (int i = 0; i < structColumnMap.length; i++) { + VectorExpression ve = structExpressions[i]; + structColumnMap[i] = ve.getOutputColumn(); + } + this.fieldVectorColumnTypes = fieldVectorColumnTypes; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java new file mode 100644 index 0000000..3b25255 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/IStructInExpr.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions; + +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; + +/** + * Interface used for both filter and non-filter versions of IN to simplify + * VectorizationContext code. + */ +public interface IStructInExpr { + void setScratchBytesColumn(int scratchBytesColumn); + void setStructColumnExprs(VectorizationContext vContext, List<ExprNodeDesc> structColumnExprs, + ColumnVector.Type[] fieldVectorColumnTypes) throws HiveException; +} http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java index 03833a2..b90e3c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringColumnInList.java @@ -140,6 +140,10 @@ public class StringColumnInList extends VectorExpression implements IStringInExp return "boolean"; } + public void setInputColumn(int inputCol) { + this.inputCol = inputCol; + } + @Override public int getOutputColumn() { return this.outputColumn; http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java new file mode 100644 index 0000000..724497a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StructColumnInList.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions; + +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Descriptor; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; + +/** + * Evaluate an IN boolean expression (not a filter) on a batch for a vector of structs. + * This is optimized so that no objects have to be created in + * the inner loop, and there is a hash table implemented + * with Cuckoo hashing that has fast lookup to do the IN test. + */ +public class StructColumnInList extends StringColumnInList implements IStructInExpr { + private static final long serialVersionUID = 1L; + private VectorExpression[] structExpressions; + private ColumnVector.Type[] fieldVectorColumnTypes; + private int[] structColumnMap; + private int scratchBytesColumn; + + private transient Output buffer; + private transient BinarySortableSerializeWrite binarySortableSerializeWrite; + + public StructColumnInList() { + super(); + } + + /** + * After construction you must call setInListValues() to add the values to the IN set. + */ + public StructColumnInList(int outputColumn) { + super(-1, outputColumn); + } + + @Override + public void evaluate(VectorizedRowBatch batch) { + + final int logicalSize = batch.size; + if (logicalSize == 0) { + return; + } + + if (buffer == null) { + buffer = new Output(); + binarySortableSerializeWrite = new BinarySortableSerializeWrite(structColumnMap.length); + } + + for (VectorExpression ve : structExpressions) { + ve.evaluate(batch); + } + + BytesColumnVector scratchBytesColumnVector = (BytesColumnVector) batch.cols[scratchBytesColumn]; + + try { + boolean selectedInUse = batch.selectedInUse; + int[] selected = batch.selected; + for (int logical = 0; logical < logicalSize; logical++) { + int batchIndex = (selectedInUse ? selected[logical] : logical); + + binarySortableSerializeWrite.set(buffer); + for (int f = 0; f < structColumnMap.length; f++) { + int fieldColumn = structColumnMap[f]; + ColumnVector colVec = batch.cols[fieldColumn]; + int adjustedIndex = (colVec.isRepeating ? 0 : batchIndex); + if (colVec.noNulls || !colVec.isNull[adjustedIndex]) { + switch (fieldVectorColumnTypes[f]) { + case BYTES: + { + BytesColumnVector bytesColVec = (BytesColumnVector) colVec; + byte[] bytes = bytesColVec.vector[adjustedIndex]; + int start = bytesColVec.start[adjustedIndex]; + int length = bytesColVec.length[adjustedIndex]; + binarySortableSerializeWrite.writeString(bytes, start, length); + } + break; + + case LONG: + binarySortableSerializeWrite.writeLong(((LongColumnVector) colVec).vector[adjustedIndex]); + break; + + case DOUBLE: + binarySortableSerializeWrite.writeDouble(((DoubleColumnVector) colVec).vector[adjustedIndex]); + break; + + case DECIMAL: + binarySortableSerializeWrite.writeHiveDecimal( + ((DecimalColumnVector) colVec).vector[adjustedIndex].getHiveDecimal()); + break; + + default: + throw new RuntimeException("Unexpected vector column type " + + fieldVectorColumnTypes[f].name()); + } + } else { + binarySortableSerializeWrite.writeNull(); + } + } + scratchBytesColumnVector.setVal(batchIndex, buffer.getData(), 0, buffer.getLength()); + } + + // Now, take the serialized keys we just wrote into our scratch column and look them + // up in the IN list. + super.evaluate(batch); + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + @Override + public String getOutputType() { + return "boolean"; + } + + @Override + public Descriptor getDescriptor() { + + // This VectorExpression (IN) is a special case, so don't return a descriptor. + return null; + } + + + @Override + public void setScratchBytesColumn(int scratchBytesColumn) { + + // Tell our super class FilterStringColumnInList it will be evaluating our scratch + // BytesColumnVector. + super.setInputColumn(scratchBytesColumn); + this.scratchBytesColumn = scratchBytesColumn; + } + + @Override + public void setStructColumnExprs(VectorizationContext vContext, + List<ExprNodeDesc> structColumnExprs, ColumnVector.Type[] fieldVectorColumnTypes) + throws HiveException { + + structExpressions = vContext.getVectorExpressions(structColumnExprs); + structColumnMap = new int[structExpressions.length]; + for (int i = 0; i < structColumnMap.length; i++) { + VectorExpression ve = structExpressions[i]; + structColumnMap[i] = ve.getOutputColumn(); + } + this.fieldVectorColumnTypes = fieldVectorColumnTypes; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/7cfe3743/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 0d4c1d8..da1d9eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -53,10 +53,12 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinLeftSemiString import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterLongOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSMBMapJoinOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; @@ -139,8 +141,11 @@ import org.apache.hadoop.hive.ql.udf.UDFYear; import org.apache.hadoop.hive.ql.udf.generic.*; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -575,7 +580,12 @@ public class Vectorizer implements PhysicalPlanResolver { if (nonVectorizableChildOfGroupBy(op)) { return new Boolean(true); } - boolean ret = validateMapWorkOperator(op, mapWork, isTez); + boolean ret; + try { + ret = validateMapWorkOperator(op, mapWork, isTez); + } catch (Exception e) { + throw new SemanticException(e); + } if (!ret) { LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized."); return new Boolean(false); @@ -1260,6 +1270,7 @@ public class Vectorizer implements PhysicalPlanResolver { LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName); return false; } + boolean isInExpression = false; if (desc instanceof ExprNodeGenericFuncDesc) { ExprNodeGenericFuncDesc d = (ExprNodeGenericFuncDesc) desc; boolean r = validateGenericUdf(d); @@ -1267,12 +1278,62 @@ public class Vectorizer implements PhysicalPlanResolver { LOG.info("Cannot vectorize UDF " + d); return false; } + GenericUDF genericUDF = d.getGenericUDF(); + isInExpression = (genericUDF instanceof GenericUDFIn); } if (desc.getChildren() != null) { - for (ExprNodeDesc d: desc.getChildren()) { - // Don't restrict child expressions for projection. Always use looser FILTER mode. - boolean r = validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER); - if (!r) { + if (isInExpression + && desc.getChildren().get(0).getTypeInfo().getCategory() == Category.STRUCT) { + // Don't restrict child expressions for projection. + // Always use loose FILTER mode. + if (!validateStructInExpression(desc, VectorExpressionDescriptor.Mode.FILTER)) { + return false; + } + } else { + for (ExprNodeDesc d : desc.getChildren()) { + // Don't restrict child expressions for projection. + // Always use loose FILTER mode. + if (!validateExprNodeDescRecursive(d, VectorExpressionDescriptor.Mode.FILTER)) { + return false; + } + } + } + } + return true; + } + + private boolean validateStructInExpression(ExprNodeDesc desc, + VectorExpressionDescriptor.Mode mode) { + for (ExprNodeDesc d : desc.getChildren()) { + TypeInfo typeInfo = d.getTypeInfo(); + if (typeInfo.getCategory() != Category.STRUCT) { + return false; + } + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + + ArrayList<TypeInfo> fieldTypeInfos = structTypeInfo + .getAllStructFieldTypeInfos(); + ArrayList<String> fieldNames = structTypeInfo.getAllStructFieldNames(); + final int fieldCount = fieldTypeInfos.size(); + for (int f = 0; f < fieldCount; f++) { + TypeInfo fieldTypeInfo = fieldTypeInfos.get(f); + Category category = fieldTypeInfo.getCategory(); + if (category != Category.PRIMITIVE) { + LOG.info("Cannot vectorize struct field " + fieldNames.get(f) + + " of type " + fieldTypeInfo.getTypeName()); + return false; + } + PrimitiveTypeInfo fieldPrimitiveTypeInfo = (PrimitiveTypeInfo) fieldTypeInfo; + InConstantType inConstantType = VectorizationContext + .getInConstantTypeFromPrimitiveCategory(fieldPrimitiveTypeInfo + .getPrimitiveCategory()); + + // For now, limit the data types we support for Vectorized Struct IN(). + if (inConstantType != InConstantType.INT_FAMILY + && inConstantType != InConstantType.FLOAT_FAMILY + && inConstantType != InConstantType.STRING_FAMILY) { + LOG.info("Cannot vectorize struct field " + fieldNames.get(f) + + " of type " + fieldTypeInfo.getTypeName()); return false; } }
