http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java index 525b3c5..41dc3e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java @@ -17,12 +17,9 @@ */ package org.apache.hadoop.hive.ql.exec.vector; +import java.io.IOException; import java.util.Arrays; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; - /** * This class represents a nullable double precision floating point column vector. * This class will be used for operations on all floating point types (float, double) @@ -36,7 +33,6 @@ import org.apache.hadoop.io.Writable; */ public class DoubleColumnVector extends ColumnVector { public double[] vector; - private final DoubleWritable writableObj = new DoubleWritable(); public static final double NULL_VALUE = Double.NaN; /** @@ -57,19 +53,6 @@ public class DoubleColumnVector extends ColumnVector { vector = new double[len]; } - @Override - public Writable getWritableObject(int index) { - if (this.isRepeating) { - index = 0; - } - if (!noNulls && isNull[index]) { - return NullWritable.get(); - } else { - writableObj.set(vector[index]); - return writableObj; - } - } - // Copy the current object contents into the output. Only copy selected entries, // as indicated by selectedInUse and the sel array. public void copySelected( @@ -121,6 +104,14 @@ public class DoubleColumnVector extends ColumnVector { vector[0] = value; } + // Fill the column vector with nulls + public void fillWithNulls() { + noNulls = false; + isRepeating = true; + vector[0] = NULL_VALUE; + isNull[0] = true; + } + // Simplify vector by brute-force flattening noNulls and isRepeating // This can be used to reduce combinatorial explosion of code paths in VectorExpressions // with many arguments. @@ -144,6 +135,44 @@ public class DoubleColumnVector extends ColumnVector { @Override public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) { - vector[outElementNum] = ((DoubleColumnVector) inputVector).vector[inputElementNum]; + if (inputVector.isRepeating) { + inputElementNum = 0; + } + if (inputVector.noNulls || !inputVector.isNull[inputElementNum]) { + isNull[outElementNum] = false; + vector[outElementNum] = + ((DoubleColumnVector) inputVector).vector[inputElementNum]; + } else { + isNull[outElementNum] = true; + noNulls = false; + } + } + + @Override + public void stringifyValue(StringBuilder buffer, int row) { + if (isRepeating) { + row = 0; + } + if (noNulls || !isNull[row]) { + buffer.append(vector[row]); + } else { + buffer.append("null"); + } + } + + @Override + public void ensureSize(int size, boolean preserveData) { + if (size > vector.length) { + super.ensureSize(size, preserveData); + double[] oldArray = vector; + vector = new double[size]; + if (preserveData) { + if (isRepeating) { + vector[0] = oldArray[0]; + } else { + System.arraycopy(oldArray, 0, vector, 0 , oldArray.length); + } + } + } } }
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java index f0545fe..0afe5db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java @@ -17,12 +17,9 @@ */ package org.apache.hadoop.hive.ql.exec.vector; +import java.io.IOException; import java.util.Arrays; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; - /** * This class represents a nullable int column vector. * This class will be used for operations on all integer types (tinyint, smallint, int, bigint) @@ -36,7 +33,6 @@ import org.apache.hadoop.io.Writable; */ public class LongColumnVector extends ColumnVector { public long[] vector; - private final LongWritable writableObj = new LongWritable(); public static final long NULL_VALUE = 1; /** @@ -50,26 +46,13 @@ public class LongColumnVector extends ColumnVector { /** * Don't use this except for testing purposes. * - * @param len + * @param len the number of rows */ public LongColumnVector(int len) { super(len); vector = new long[len]; } - @Override - public Writable getWritableObject(int index) { - if (this.isRepeating) { - index = 0; - } - if (!noNulls && isNull[index]) { - return NullWritable.get(); - } else { - writableObj.set(vector[index]); - return writableObj; - } - } - // Copy the current object contents into the output. Only copy selected entries, // as indicated by selectedInUse and the sel array. public void copySelected( @@ -141,7 +124,9 @@ public class LongColumnVector extends ColumnVector { } } else { - System.arraycopy(vector, 0, output.vector, 0, size); + for(int i = 0; i < size; ++i) { + output.vector[i] = vector[i]; + } } // Copy nulls over if needed @@ -165,6 +150,14 @@ public class LongColumnVector extends ColumnVector { vector[0] = value; } + // Fill the column vector with nulls + public void fillWithNulls() { + noNulls = false; + isRepeating = true; + vector[0] = NULL_VALUE; + isNull[0] = true; + } + // Simplify vector by brute-force flattening noNulls and isRepeating // This can be used to reduce combinatorial explosion of code paths in VectorExpressions // with many arguments. @@ -188,6 +181,44 @@ public class LongColumnVector extends ColumnVector { @Override public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) { - vector[outElementNum] = ((LongColumnVector) inputVector).vector[inputElementNum]; + if (inputVector.isRepeating) { + inputElementNum = 0; + } + if (inputVector.noNulls || !inputVector.isNull[inputElementNum]) { + isNull[outElementNum] = false; + vector[outElementNum] = + ((LongColumnVector) inputVector).vector[inputElementNum]; + } else { + isNull[outElementNum] = true; + noNulls = false; + } + } + + @Override + public void stringifyValue(StringBuilder buffer, int row) { + if (isRepeating) { + row = 0; + } + if (noNulls || !isNull[row]) { + buffer.append(vector[row]); + } else { + buffer.append("null"); + } + } + + @Override + public void ensureSize(int size, boolean preserveData) { + if (size > vector.length) { + super.ensureSize(size, preserveData); + long[] oldArray = vector; + vector = new long[size]; + if (preserveData) { + if (isRepeating) { + vector[0] = oldArray[0]; + } else { + System.arraycopy(oldArray, 0, vector, 0 , oldArray.length); + } + } + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java index 8452abd..56cf9ba 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java @@ -31,15 +31,10 @@ import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.fast.DeserializeRead; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hive.common.util.DateUtils; @@ -61,12 +56,12 @@ public class VectorDeserializeRow { private Reader[] readersByValue; private Reader[] readersByReference; - private PrimitiveTypeInfo[] primitiveTypeInfos; + private TypeInfo[] typeInfos; public VectorDeserializeRow(DeserializeRead deserializeRead) { this(); this.deserializeRead = deserializeRead; - primitiveTypeInfos = deserializeRead.primitiveTypeInfos(); + typeInfos = deserializeRead.typeInfos(); } @@ -564,7 +559,7 @@ public class VectorDeserializeRow { Reader readerByValue = null; Reader readerByReference = null; - PrimitiveTypeInfo primitiveTypeInfo = primitiveTypeInfos[index]; + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfos[index]; PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory(); switch (primitiveCategory) { // case VOID: @@ -642,10 +637,10 @@ public class VectorDeserializeRow { public void init(int[] outputColumns) throws HiveException { - readersByValue = new Reader[primitiveTypeInfos.length]; - readersByReference = new Reader[primitiveTypeInfos.length]; + readersByValue = new Reader[typeInfos.length]; + readersByReference = new Reader[typeInfos.length]; - for (int i = 0; i < primitiveTypeInfos.length; i++) { + for (int i = 0; i < typeInfos.length; i++) { int outputColumn = outputColumns[i]; addReader(i, outputColumn); } @@ -653,10 +648,10 @@ public class VectorDeserializeRow { public void init(List<Integer> outputColumns) throws HiveException { - readersByValue = new Reader[primitiveTypeInfos.length]; - readersByReference = new Reader[primitiveTypeInfos.length]; + readersByValue = new Reader[typeInfos.length]; + readersByReference = new Reader[typeInfos.length]; - for (int i = 0; i < primitiveTypeInfos.length; i++) { + for (int i = 0; i < typeInfos.length; i++) { int outputColumn = outputColumns.get(i); addReader(i, outputColumn); } @@ -664,10 +659,10 @@ public class VectorDeserializeRow { public void init(int startColumn) throws HiveException { - readersByValue = new Reader[primitiveTypeInfos.length]; - readersByReference = new Reader[primitiveTypeInfos.length]; + readersByValue = new Reader[typeInfos.length]; + readersByReference = new Reader[typeInfos.length]; - for (int i = 0; i < primitiveTypeInfos.length; i++) { + for (int i = 0; i < typeInfos.length; i++) { int outputColumn = startColumn + i; addReader(i, outputColumn); } @@ -709,14 +704,14 @@ public class VectorDeserializeRow { private void throwMoreDetailedException(IOException e, int index) throws EOFException { StringBuilder sb = new StringBuilder(); - sb.append("Detail: \"" + e.toString() + "\" occured for field " + index + " of " + primitiveTypeInfos.length + " fields ("); - for (int i = 0; i < primitiveTypeInfos.length; i++) { + sb.append("Detail: \"" + e.toString() + "\" occured for field " + index + " of " + typeInfos.length + " fields ("); + for (int i = 0; i < typeInfos.length; i++) { if (i > 0) { sb.append(", "); } - sb.append(primitiveTypeInfos[i].getPrimitiveCategory().name()); + sb.append(((PrimitiveTypeInfo) typeInfos[i]).getPrimitiveCategory().name()); } sb.append(")"); throw new EOFException(sb.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java index ee6939d..9774f0c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java @@ -468,6 +468,9 @@ public abstract class VectorExtractRow { int start = colVector.start[adjustedIndex]; int length = colVector.length[adjustedIndex]; + if (value == null) { + LOG.info("null string entry: batchIndex " + batchIndex + " columnIndex " + columnIndex); + } // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String. text.set(value, start, length); @@ -727,9 +730,9 @@ public abstract class VectorExtractRow { } public void extractRow(int batchIndex, Object[] objects) { - int i = 0; - for (Extractor extracter : extracters) { - objects[i++] = extracter.extract(batchIndex); + for (int i = 0; i < extracters.length; i++) { + Extractor extracter = extracters[i]; + objects[i] = extracter.extract(batchIndex); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 39a83e3..fa66964 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -814,7 +814,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements outputFieldNames, objectInspectors); if (isVectorOutput) { vrbCtx = new VectorizedRowBatchCtx(); - vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) outputObjInspector); + vrbCtx.init((StructObjectInspector) outputObjInspector, vOutContext.getScratchColumnTypeNames()); outputBatch = vrbCtx.createVectorizedRowBatch(); vectorAssignRowSameBatch = new VectorAssignRowSameBatch(); vectorAssignRowSameBatch.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns()); http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java index 0baec2c..9920e9c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java @@ -91,7 +91,7 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector Collection<Future<?>> result = super.initializeOp(hconf); vrbCtx = new VectorizedRowBatchCtx(); - vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector); + vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames()); outputBatch = vrbCtx.createVectorizedRowBatch(); @@ -182,4 +182,4 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector public VectorizationContext getOuputVectorizationContext() { return vOutContext; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java index 804ba17..66190ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java @@ -146,7 +146,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect Collection<Future<?>> result = super.initializeOp(hconf); vrbCtx = new VectorizedRowBatchCtx(); - vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector); + vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames()); outputBatch = vrbCtx.createVectorizedRowBatch(); http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java index 342bf67..5586944 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSerializeRow.java @@ -22,8 +22,6 @@ import java.io.IOException; import java.sql.Timestamp; import java.util.List; -import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; -import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; @@ -604,13 +602,13 @@ public class VectorSerializeRow { } } - public void init(PrimitiveTypeInfo[] primitiveTypeInfos, List<Integer> columnMap) + public void init(TypeInfo[] typeInfos, int[] columnMap) throws HiveException { - writers = new Writer[primitiveTypeInfos.length]; - for (int i = 0; i < primitiveTypeInfos.length; i++) { - int columnIndex = columnMap.get(i); - Writer writer = createWriter(primitiveTypeInfos[i], columnIndex); + writers = new Writer[typeInfos.length]; + for (int i = 0; i < typeInfos.length; i++) { + int columnIndex = columnMap[i]; + Writer writer = createWriter(typeInfos[i], columnIndex); writers[i] = writer; } } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index da89e38..ea03099 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -145,6 +145,8 @@ public class VectorizationContext { VectorExpressionDescriptor vMap; + private List<String> initialColumnNames; + private List<Integer> projectedColumns; private List<String> projectionColumnNames; private Map<String, Integer> projectionColumnMap; @@ -158,7 +160,11 @@ public class VectorizationContext { public VectorizationContext(String contextName, List<String> initialColumnNames) { this.contextName = contextName; level = 0; - LOG.info("VectorizationContext consructor contextName " + contextName + " level " + level + " initialColumnNames " + initialColumnNames.toString()); + if (LOG.isDebugEnabled()) { + LOG.debug("VectorizationContext consructor contextName " + contextName + " level " + + level + " initialColumnNames " + initialColumnNames); + } + this.initialColumnNames = initialColumnNames; this.projectionColumnNames = initialColumnNames; projectedColumns = new ArrayList<Integer>(); @@ -178,8 +184,11 @@ public class VectorizationContext { public VectorizationContext(String contextName) { this.contextName = contextName; level = 0; - LOG.info("VectorizationContext consructor contextName " + contextName + " level " + level); - projectedColumns = new ArrayList<Integer>(); + if (LOG.isDebugEnabled()) { + LOG.debug("VectorizationContext consructor contextName " + contextName + " level " + level); + } + initialColumnNames = new ArrayList<String>(); + projectedColumns = new ArrayList<Integer>(); projectionColumnNames = new ArrayList<String>(); projectionColumnMap = new HashMap<String, Integer>(); this.ocm = new OutputColumnManager(0); @@ -194,6 +203,7 @@ public class VectorizationContext { this.contextName = contextName; level = vContext.level + 1; LOG.info("VectorizationContext consructor reference contextName " + contextName + " level " + level); + this.initialColumnNames = vContext.initialColumnNames; this.projectedColumns = new ArrayList<Integer>(); this.projectionColumnNames = new ArrayList<String>(); this.projectionColumnMap = new HashMap<String, Integer>(); @@ -206,6 +216,7 @@ public class VectorizationContext { // Add an initial column to a vectorization context when // a vectorized row batch is being created. public void addInitialColumn(String columnName) { + initialColumnNames.add(columnName); int index = projectedColumns.size(); projectedColumns.add(index); projectionColumnNames.add(columnName); @@ -234,6 +245,10 @@ public class VectorizationContext { projectionColumnMap.put(columnName, vectorBatchColIndex); } + public List<String> getInitialColumnNames() { + return initialColumnNames; + } + public List<Integer> getProjectedColumns() { return projectedColumns; } @@ -2303,36 +2318,51 @@ public class VectorizationContext { } public static ColumnVector.Type getColumnVectorTypeFromTypeInfo(TypeInfo typeInfo) throws HiveException { - PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo; - PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory(); - - switch (primitiveCategory) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - case DATE: - case TIMESTAMP: - case INTERVAL_YEAR_MONTH: - case INTERVAL_DAY_TIME: - return ColumnVector.Type.LONG; - - case FLOAT: - case DOUBLE: - return ColumnVector.Type.DOUBLE; - - case STRING: - case CHAR: - case VARCHAR: - case BINARY: - return ColumnVector.Type.BYTES; - - case DECIMAL: - return ColumnVector.Type.DECIMAL; - - default: - throw new HiveException("Unexpected primitive type category " + primitiveCategory); + switch (typeInfo.getCategory()) { + case STRUCT: + return Type.STRUCT; + case UNION: + return Type.UNION; + case LIST: + return Type.LIST; + case MAP: + return Type.MAP; + case PRIMITIVE: { + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo; + PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory(); + + switch (primitiveCategory) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DATE: + case TIMESTAMP: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + return ColumnVector.Type.LONG; + + case FLOAT: + case DOUBLE: + return ColumnVector.Type.DOUBLE; + + case STRING: + case CHAR: + case VARCHAR: + case BINARY: + return ColumnVector.Type.BYTES; + + case DECIMAL: + return ColumnVector.Type.DECIMAL; + + default: + throw new RuntimeException("Unexpected primitive type category " + primitiveCategory); + } + } + default: + throw new RuntimeException("Unexpected type category " + + typeInfo.getCategory()); } } @@ -2442,13 +2472,16 @@ public class VectorizationContext { return firstOutputColumnIndex; } - public Map<Integer, String> getScratchColumnTypeMap() { - Map<Integer, String> map = new HashMap<Integer, String>(); + public String[] getScratchColumnTypeNames() { + String[] result = new String[ocm.outputColCount]; for (int i = 0; i < ocm.outputColCount; i++) { - String type = ocm.outputColumnsTypes[i]; - map.put(i+this.firstOutputColumnIndex, type); + String typeName = ocm.outputColumnsTypes[i]; + if (typeName.equalsIgnoreCase("long")) { + typeName = "bigint"; // Convert our synonym to a real Hive type name. + } + result[i] = typeName; } - return map; + return result; } @Override @@ -2468,9 +2501,7 @@ public class VectorizationContext { } sb.append("sorted projectionColumnMap ").append(sortedColumnMap).append(", "); - Map<Integer, String> sortedScratchColumnTypeMap = new TreeMap<Integer, String>(comparerInteger); - sortedScratchColumnTypeMap.putAll(getScratchColumnTypeMap()); - sb.append("sorted scratchColumnTypeMap ").append(sortedScratchColumnTypeMap); + sb.append("scratchColumnTypeNames ").append(getScratchColumnTypeNames().toString()); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 3d7e4f0..b7e13dd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -53,9 +53,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspect import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; @@ -111,137 +115,55 @@ public class VectorizedBatchUtil { batch.size = size; } - /** - * Walk through the object inspector and add column vectors - * - * @param oi - * @param cvList - * ColumnVectors are populated in this list - */ - private static void allocateColumnVector(StructObjectInspector oi, - List<ColumnVector> cvList) throws HiveException { - if (cvList == null) { - throw new HiveException("Null columnvector list"); - } - if (oi == null) { - return; - } - final List<? extends StructField> fields = oi.getAllStructFieldRefs(); - for(StructField field : fields) { - ObjectInspector fieldObjectInspector = field.getFieldObjectInspector(); - switch(fieldObjectInspector.getCategory()) { - case PRIMITIVE: - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) fieldObjectInspector; - switch(poi.getPrimitiveCategory()) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - case TIMESTAMP: - case DATE: - case INTERVAL_YEAR_MONTH: - case INTERVAL_DAY_TIME: - cvList.add(new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE)); - break; - case FLOAT: - case DOUBLE: - cvList.add(new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE)); - break; - case BINARY: - case STRING: - case CHAR: - case VARCHAR: - cvList.add(new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE)); - break; - case DECIMAL: - DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo(); - cvList.add(new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, - tInfo.precision(), tInfo.scale())); - break; - default: - throw new HiveException("Vectorizaton is not supported for datatype:" - + poi.getPrimitiveCategory()); - } - break; - case STRUCT: - throw new HiveException("Struct not supported"); - default: - throw new HiveException("Flattening is not supported for datatype:" - + fieldObjectInspector.getCategory()); - } - } - } - - - /** - * Create VectorizedRowBatch from ObjectInspector - * - * @param oi - * @return - * @throws HiveException - */ - public static VectorizedRowBatch constructVectorizedRowBatch( - StructObjectInspector oi) throws HiveException { - final List<ColumnVector> cvList = new LinkedList<ColumnVector>(); - allocateColumnVector(oi, cvList); - final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size()); - int i = 0; - for(ColumnVector cv : cvList) { - result.cols[i++] = cv; - } - return result; - } + public static ColumnVector createColumnVector(String typeName) { + typeName = typeName.toLowerCase(); - /** - * Create VectorizedRowBatch from key and value object inspectors - * The row object inspector used by ReduceWork needs to be a **standard** - * struct object inspector, not just any struct object inspector. - * @param keyInspector - * @param valueInspector - * @param vectorScratchColumnTypeMap - * @return VectorizedRowBatch, OI - * @throws HiveException - */ - public static ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> constructVectorizedRowBatch( - StructObjectInspector keyInspector, StructObjectInspector valueInspector, Map<Integer, String> vectorScratchColumnTypeMap) - throws HiveException { - - ArrayList<String> colNames = new ArrayList<String>(); - ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); - List<? extends StructField> fields = keyInspector.getAllStructFieldRefs(); - for (StructField field: fields) { - colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); + // Allow undecorated CHAR and VARCHAR to support scratch column type names. + if (typeName.equals("char") || typeName.equals("varchar")) { + return new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); } - fields = valueInspector.getAllStructFieldRefs(); - for (StructField field: fields) { - colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois); - VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx(); - batchContext.init(vectorScratchColumnTypeMap, rowObjectInspector); - return new ObjectPair<>(batchContext.createVectorizedRowBatch(), rowObjectInspector); + TypeInfo typeInfo = (TypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(typeName); + return createColumnVector(typeInfo); } - /** - * Iterates through all columns in a given row and populates the batch - * - * @param row - * @param oi - * @param rowIndex - * @param batch - * @param buffer - * @throws HiveException - */ - public static void addRowToBatch(Object row, StructObjectInspector oi, - int rowIndex, - VectorizedRowBatch batch, - DataOutputBuffer buffer - ) throws HiveException { - addRowToBatchFrom(row, oi, rowIndex, 0, batch, buffer); + public static ColumnVector createColumnVector(TypeInfo typeInfo) { + switch(typeInfo.getCategory()) { + case PRIMITIVE: + { + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo; + switch(primitiveTypeInfo.getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case TIMESTAMP: + case DATE: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + return new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case FLOAT: + case DOUBLE: + return new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + return new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + case DECIMAL: + DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; + return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, + tInfo.precision(), tInfo.scale()); + default: + throw new RuntimeException("Vectorizaton is not supported for datatype:" + + primitiveTypeInfo.getPrimitiveCategory()); + } + } + default: + throw new RuntimeException("Vectorization is not supported for datatype:" + + typeInfo.getCategory()); + } } /** @@ -584,31 +506,30 @@ public class VectorizedBatchUtil { return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,oids); } - public static PrimitiveTypeInfo[] primitiveTypeInfosFromStructObjectInspector( + public static String[] columnNamesFromStructObjectInspector( StructObjectInspector structObjectInspector) throws HiveException { List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs(); - PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[fields.size()]; + String[] result = new String[fields.size()]; int i = 0; for(StructField field : fields) { - TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString( - field.getFieldObjectInspector().getTypeName()); - result[i++] = (PrimitiveTypeInfo) typeInfo; + result[i++] = field.getFieldName(); } return result; } - public static PrimitiveTypeInfo[] primitiveTypeInfosFromTypeNames( - String[] typeNames) throws HiveException { - - PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[typeNames.length]; + public static TypeInfo[] typeInfosFromTypeNames(String[] typeNames) throws HiveException { + ArrayList<TypeInfo> typeInfoList = + TypeInfoUtils.typeInfosFromTypeNames(Arrays.asList(typeNames)); + return typeInfoList.toArray(new TypeInfo[0]); + } - for(int i = 0; i < typeNames.length; i++) { - TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeNames[i]); - result[i] = (PrimitiveTypeInfo) typeInfo; - } - return result; + public static TypeInfo[] typeInfosFromStructObjectInspector( + StructObjectInspector structObjectInspector) { + ArrayList<TypeInfo> typeInfoList = + TypeInfoUtils.typeInfosFromStructObjectInspector(structObjectInspector); + return typeInfoList.toArray(new TypeInfo[0]); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java deleted file mode 100644 index 5ce7553..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java +++ /dev/null @@ -1,277 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector; - -import java.nio.ByteBuffer; -import java.sql.Timestamp; -import java.util.List; - -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.ByteStream; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; -import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; -import org.apache.hadoop.hive.serde2.io.DateWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.serde2.lazy.LazyDate; -import org.apache.hadoop.hive.serde2.lazy.LazyLong; -import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp; -import org.apache.hadoop.hive.serde2.lazy.LazyUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; - -/** - * VectorizedColumnarSerDe is used by Vectorized query execution engine - * for columnar based storage supported by RCFile. - */ -public class VectorizedColumnarSerDe extends ColumnarSerDe implements VectorizedSerde { - - public VectorizedColumnarSerDe() throws SerDeException { - } - - private final BytesRefArrayWritable[] byteRefArray = new BytesRefArrayWritable[VectorizedRowBatch.DEFAULT_SIZE]; - private final ObjectWritable ow = new ObjectWritable(); - private final ByteStream.Output serializeVectorStream = new ByteStream.Output(); - - /** - * Serialize a vectorized row batch - * - * @param vrg - * Vectorized row batch to serialize - * @param objInspector - * The ObjectInspector for the row object - * @return The serialized Writable object - * @throws SerDeException - * @see SerDe#serialize(Object, ObjectInspector) - */ - @Override - public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector) - throws SerDeException { - try { - // Validate that the OI is of struct type - if (objInspector.getCategory() != Category.STRUCT) { - throw new UnsupportedOperationException(getClass().toString() - + " can only serialize struct types, but we got: " - + objInspector.getTypeName()); - } - - VectorizedRowBatch batch = (VectorizedRowBatch) vrg; - StructObjectInspector soi = (StructObjectInspector) objInspector; - List<? extends StructField> fields = soi.getAllStructFieldRefs(); - - // Reset the byte buffer - serializeVectorStream.reset(); - int count = 0; - int rowIndex = 0; - for (int i = 0; i < batch.size; i++) { - - // If selectedInUse is true then we need to serialize only - // the selected indexes - if (batch.selectedInUse) { - rowIndex = batch.selected[i]; - } else { - rowIndex = i; - } - - BytesRefArrayWritable byteRow = byteRefArray[i]; - int numCols = fields.size(); - - if (byteRow == null) { - byteRow = new BytesRefArrayWritable(numCols); - byteRefArray[i] = byteRow; - } - - byteRow.resetValid(numCols); - - for (int p = 0; p < batch.projectionSize; p++) { - int k = batch.projectedColumns[p]; - ObjectInspector foi = fields.get(k).getFieldObjectInspector(); - ColumnVector currentColVector = batch.cols[k]; - - switch (foi.getCategory()) { - case PRIMITIVE: { - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; - if (!currentColVector.noNulls - && (currentColVector.isRepeating || currentColVector.isNull[rowIndex])) { - // The column is null hence write null value - serializeVectorStream.write(new byte[0], 0, 0); - } else { - // If here then the vector value is not null. - if (currentColVector.isRepeating) { - // If the vector has repeating values then set rowindex to zero - rowIndex = 0; - } - - switch (poi.getPrimitiveCategory()) { - case BOOLEAN: { - LongColumnVector lcv = (LongColumnVector) batch.cols[k]; - // In vectorization true is stored as 1 and false as 0 - boolean b = lcv.vector[rowIndex] == 1 ? true : false; - if (b) { - serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length); - } else { - serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length); - } - } - break; - case BYTE: - case SHORT: - case INT: - case LONG: - LongColumnVector lcv = (LongColumnVector) batch.cols[k]; - LazyLong.writeUTF8(serializeVectorStream, lcv.vector[rowIndex]); - break; - case FLOAT: - case DOUBLE: - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[k]; - ByteBuffer b = Text.encode(String.valueOf(dcv.vector[rowIndex])); - serializeVectorStream.write(b.array(), 0, b.limit()); - break; - case BINARY: { - BytesColumnVector bcv = (BytesColumnVector) batch.cols[k]; - byte[] bytes = bcv.vector[rowIndex]; - serializeVectorStream.write(bytes, 0, bytes.length); - } - break; - case STRING: - case CHAR: - case VARCHAR: { - // Is it correct to escape CHAR and VARCHAR? - BytesColumnVector bcv = (BytesColumnVector) batch.cols[k]; - LazyUtils.writeEscaped(serializeVectorStream, bcv.vector[rowIndex], - bcv.start[rowIndex], - bcv.length[rowIndex], - serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams - .getNeedsEscape()); - } - break; - case TIMESTAMP: - LongColumnVector tcv = (LongColumnVector) batch.cols[k]; - long timeInNanoSec = tcv.vector[rowIndex]; - Timestamp t = new Timestamp(0); - TimestampUtils.assignTimeInNanoSec(timeInNanoSec, t); - TimestampWritable tw = new TimestampWritable(); - tw.set(t); - LazyTimestamp.writeUTF8(serializeVectorStream, tw); - break; - case DATE: - LongColumnVector dacv = (LongColumnVector) batch.cols[k]; - DateWritable daw = new DateWritable((int) dacv.vector[rowIndex]); - LazyDate.writeUTF8(serializeVectorStream, daw); - break; - default: - throw new UnsupportedOperationException( - "Vectorizaton is not supported for datatype:" - + poi.getPrimitiveCategory()); - } - } - break; - } - case LIST: - case MAP: - case STRUCT: - case UNION: - throw new UnsupportedOperationException("Vectorizaton is not supported for datatype:" - + foi.getCategory()); - default: - throw new SerDeException("Unknown ObjectInspector category!"); - - } - - byteRow.get(k).set(serializeVectorStream.getData(), count, serializeVectorStream - .getLength() - count); - count = serializeVectorStream.getLength(); - } - - } - ow.set(byteRefArray); - } catch (Exception e) { - throw new SerDeException(e); - } - return ow; - } - - @Override - public SerDeStats getSerDeStats() { - return null; - } - - @Override - public Class<? extends Writable> getSerializedClass() { - return BytesRefArrayWritable.class; - } - - @Override - public Object deserialize(Writable blob) throws SerDeException { - - // Ideally this should throw UnsupportedOperationException as the serde is - // vectorized serde. But since RC file reader does not support vectorized reading this - // is left as it is. This function will be called from VectorizedRowBatchCtx::addRowToBatch - // to deserialize the row one by one and populate the batch. Once RC file reader supports vectorized - // reading this serde and be standalone serde with no dependency on ColumnarSerDe. - return super.deserialize(blob); - } - - @Override - public ObjectInspector getObjectInspector() throws SerDeException { - return cachedObjectInspector; - } - - @Override - public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { - throw new UnsupportedOperationException(); - } - - /** - * Deserializes the rowBlob into Vectorized row batch - * @param rowBlob - * rowBlob row batch to deserialize - * @param rowsInBlob - * Total number of rows in rowBlob to deserialize - * @param reuseBatch - * VectorizedRowBatch to which the rows should be serialized * - * @throws SerDeException - */ - @Override - public void deserializeVector(Object rowBlob, int rowsInBlob, - VectorizedRowBatch reuseBatch) throws SerDeException { - - BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) rowBlob; - DataOutputBuffer buffer = new DataOutputBuffer(); - for (int i = 0; i < rowsInBlob; i++) { - Object row = deserialize(refArray[i]); - try { - VectorizedBatchUtil.addRowToBatch(row, - (StructObjectInspector) cachedObjectInspector, i, - reuseBatch, buffer); - } catch (HiveException e) { - throw new SerDeException(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index 7e41384..2882024 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -45,6 +45,10 @@ public class VectorizedRowBatch implements Writable { public int[] projectedColumns; public int projectionSize; + private int dataColumnCount; + private int partitionColumnCount; + + /* * If no filtering has been applied yet, selectedInUse is false, * meaning that all rows qualify. If it is true, then the selected[] array @@ -94,6 +98,22 @@ public class VectorizedRowBatch implements Writable { for (int i = 0; i < numCols; i++) { projectedColumns[i] = i; } + + dataColumnCount = -1; + partitionColumnCount = -1; + } + + public void setPartitionInfo(int dataColumnCount, int partitionColumnCount) { + this.dataColumnCount = dataColumnCount; + this.partitionColumnCount = partitionColumnCount; + } + + public int getDataColumnCount() { + return dataColumnCount; + } + + public int getPartitionColumnCount() { + return partitionColumnCount; } /** http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 81ab129..efb06b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -20,16 +20,10 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.io.IOException; import java.sql.Date; import java.sql.Timestamp; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,392 +32,270 @@ import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.IOPrepareCache; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hive.common.util.DateUtils; /** - * Context for Vectorized row batch. this calss does eager deserialization of row data using serde + * Context for Vectorized row batch. this class does eager deserialization of row data using serde * in the RecordReader layer. * It has supports partitions in this layer so that the vectorized batch is populated correctly * with the partition column. */ public class VectorizedRowBatchCtx { + private static final long serialVersionUID = 1L; + private static final Log LOG = LogFactory.getLog(VectorizedRowBatchCtx.class.getName()); - // OI for raw row data (EG without partition cols) - private StructObjectInspector rawRowOI; + // The following information is for creating VectorizedRowBatch and for helping with + // knowing how the table is partitioned. + // + // It will be stored in MapWork and ReduceWork. + private String[] rowColumnNames; + private TypeInfo[] rowColumnTypeInfos; + private int dataColumnCount; + private int partitionColumnCount; - // OI for the row (Raw row OI + partition OI) - private StructObjectInspector rowOI; + private String[] scratchColumnTypeNames; - // Deserializer for the row data - private Deserializer deserializer; + /** + * Constructor for VectorizedRowBatchCtx + */ + public VectorizedRowBatchCtx() { + } - // Hash map of partition values. Key=TblColName value=PartitionValue - private Map<String, Object> partitionValues; - - //partition types - private Map<String, PrimitiveCategory> partitionTypes; + public VectorizedRowBatchCtx(String[] rowColumnNames, TypeInfo[] rowColumnTypeInfos, + int partitionColumnCount, String[] scratchColumnTypeNames) { + this.rowColumnNames = rowColumnNames; + this.rowColumnTypeInfos = rowColumnTypeInfos; + this.partitionColumnCount = partitionColumnCount; + this.scratchColumnTypeNames = scratchColumnTypeNames; - // partition column positions, for use by classes that need to know whether a given column is a - // partition column - private Set<Integer> partitionCols; - - // Column projection list - List of column indexes to include. This - // list does not contain partition columns - private List<Integer> colsToInclude; + dataColumnCount = rowColumnTypeInfos.length - partitionColumnCount; + } - private Map<Integer, String> scratchColumnTypeMap = null; + public String[] getRowColumnNames() { + return rowColumnNames; + } - /** - * Constructor for VectorizedRowBatchCtx - * - * @param rawRowOI - * OI for raw row data (EG without partition cols) - * @param rowOI - * OI for the row (Raw row OI + partition OI) - * @param deserializer - * Deserializer for the row data - * @param partitionValues - * Hash map of partition values. Key=TblColName value=PartitionValue - */ - public VectorizedRowBatchCtx(StructObjectInspector rawRowOI, StructObjectInspector rowOI, - Deserializer deserializer, Map<String, Object> partitionValues, - Map<String, PrimitiveCategory> partitionTypes) { - this.rowOI = rowOI; - this.rawRowOI = rawRowOI; - this.deserializer = deserializer; - this.partitionValues = partitionValues; - this.partitionTypes = partitionTypes; + public TypeInfo[] getRowColumnTypeInfos() { + return rowColumnTypeInfos; } - /** - * Constructor for VectorizedRowBatchCtx - */ - public VectorizedRowBatchCtx() { + public int getDataColumnCount() { + return dataColumnCount; + } + public int getPartitionColumnCount() { + return partitionColumnCount; + } + + public String[] getScratchColumnTypeNames() { + return scratchColumnTypeNames; } /** - * Initializes the VectorizedRowBatch context based on an scratch column type map and + * Initializes the VectorizedRowBatch context based on an scratch column type names and * object inspector. - * @param scratchColumnTypeMap - * @param rowOI + * @param structObjectInspector + * @param scratchColumnTypeNames * Object inspector that shapes the column types + * @throws HiveException */ - public void init(Map<Integer, String> scratchColumnTypeMap, - StructObjectInspector rowOI) { - this.scratchColumnTypeMap = scratchColumnTypeMap; - this.rowOI= rowOI; - this.rawRowOI = rowOI; + public void init(StructObjectInspector structObjectInspector, String[] scratchColumnTypeNames) + throws HiveException { + + // Row column information. + rowColumnNames = VectorizedBatchUtil.columnNamesFromStructObjectInspector(structObjectInspector); + rowColumnTypeInfos = VectorizedBatchUtil.typeInfosFromStructObjectInspector(structObjectInspector); + partitionColumnCount = 0; + dataColumnCount = rowColumnTypeInfos.length; + + // Scratch column information. + this.scratchColumnTypeNames = scratchColumnTypeNames; } - /** - * Initializes VectorizedRowBatch context based on the - * split and Hive configuration (Job conf with hive Plan). - * - * @param hiveConf - * Hive configuration using Hive plan is extracted - * @param split - * File split of the file being read - * @throws ClassNotFoundException - * @throws IOException - * @throws SerDeException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws HiveException - */ - public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundException, - IOException, - SerDeException, - InstantiationException, - IllegalAccessException, - HiveException { + public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, Configuration hiveConf, + FileSplit split, Object[] partitionValues) throws IOException { Map<String, PartitionDesc> pathToPartitionInfo = Utilities .getMapWork(hiveConf).getPathToPartitionInfo(); - PartitionDesc part = HiveFileFormatUtils + PartitionDesc partDesc = HiveFileFormatUtils .getPartitionDescFromPathRecursively(pathToPartitionInfo, split.getPath(), IOPrepareCache.get().getPartitionDescMap()); - String partitionPath = split.getPath().getParent().toString(); - scratchColumnTypeMap = Utilities.getMapWorkVectorScratchColumnTypeMap(hiveConf); - // LOG.info("VectorizedRowBatchCtx init scratchColumnTypeMap " + scratchColumnTypeMap.toString()); - - Properties partProps = - (part.getPartSpec() == null || part.getPartSpec().isEmpty()) ? - part.getTableDesc().getProperties() : part.getProperties(); - - Class serdeclass = hiveConf.getClassByName(part.getSerdeClassName()); - Deserializer partDeserializer = (Deserializer) serdeclass.newInstance(); - SerDeUtils.initializeSerDe(partDeserializer, hiveConf, part.getTableDesc().getProperties(), - partProps); - StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer - .getObjectInspector(); - - deserializer = partDeserializer; - - // Check to see if this split is part of a partition of a table - String pcols = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - - String[] partKeys = null; - if (pcols != null && pcols.length() > 0) { - - // Partitions exist for this table. Get the partition object inspector and - // raw row object inspector (row with out partition col) - LinkedHashMap<String, String> partSpec = part.getPartSpec(); - partKeys = pcols.trim().split("/"); - String pcolTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); - String[] partKeyTypes = pcolTypes.trim().split(":"); - - if (partKeys.length > partKeyTypes.length) { - throw new HiveException("Internal error : partKeys length, " +partKeys.length + - " greater than partKeyTypes length, " + partKeyTypes.length); - } - - List<String> partNames = new ArrayList<String>(partKeys.length); - List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length); - partitionValues = new LinkedHashMap<String, Object>(); - partitionTypes = new LinkedHashMap<String, PrimitiveCategory>(); - for (int i = 0; i < partKeys.length; i++) { - String key = partKeys[i]; - partNames.add(key); - ObjectInspector objectInspector = null; - Object objectVal; - if (partSpec == null) { - // for partitionless table, initialize partValue to empty string. - // We can have partitionless table even if we have partition keys - // when there is only only partition selected and the partition key is not - // part of the projection/include list. - objectVal = null; - objectInspector = PrimitiveObjectInspectorFactory.javaStringObjectInspector; - partitionTypes.put(key, PrimitiveCategory.STRING); - } else { - // Create a Standard java object Inspector - PrimitiveTypeInfo partColTypeInfo = TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]); - objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( - partColTypeInfo); - objectVal = - ObjectInspectorConverters. - getConverter(PrimitiveObjectInspectorFactory. - javaStringObjectInspector, objectInspector). - convert(partSpec.get(key)); - if (partColTypeInfo instanceof CharTypeInfo) { - objectVal = ((HiveChar) objectVal).getStrippedValue(); - } - partitionTypes.put(key, partColTypeInfo.getPrimitiveCategory()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Partition column: name: " + key + ", value: " + objectVal + ", type: " + partitionTypes.get(key)); - } - partitionValues.put(key, objectVal); - partObjectInspectors.add(objectInspector); - } - - // Create partition OI - StructObjectInspector partObjectInspector = ObjectInspectorFactory - .getStandardStructObjectInspector(partNames, partObjectInspectors); - - // Get row OI from partition OI and raw row OI - StructObjectInspector rowObjectInspector = ObjectInspectorFactory - .getUnionStructObjectInspector(Arrays - .asList(new StructObjectInspector[] {partRawRowObjectInspector, partObjectInspector})); - rowOI = rowObjectInspector; - rawRowOI = partRawRowObjectInspector; - - // We have to do this after we've set rowOI, as getColIndexBasedOnColName uses it - partitionCols = new HashSet<Integer>(); - if (pcols != null && pcols.length() > 0) { - for (int i = 0; i < partKeys.length; i++) { - partitionCols.add(getColIndexBasedOnColName(partKeys[i])); - } - } + getPartitionValues(vrbCtx, partDesc, partitionValues); - } else { + } - // No partitions for this table, hence row OI equals raw row OI - rowOI = partRawRowObjectInspector; - rawRowOI = partRawRowObjectInspector; + public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, PartitionDesc partDesc, + Object[] partitionValues) { + + LinkedHashMap<String, String> partSpec = partDesc.getPartSpec(); + + for (int i = 0; i < vrbCtx.partitionColumnCount; i++) { + Object objectValue; + if (partSpec == null) { + // For partition-less table, initialize partValue to empty string. + // We can have partition-less table even if we have partition keys + // when there is only only partition selected and the partition key is not + // part of the projection/include list. + objectValue = null; + } else { + String key = vrbCtx.rowColumnNames[vrbCtx.dataColumnCount + i]; + + // Create a Standard java object Inspector + ObjectInspector objectInspector = + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( + vrbCtx.rowColumnTypeInfos[vrbCtx.dataColumnCount + i]); + objectValue = + ObjectInspectorConverters. + getConverter(PrimitiveObjectInspectorFactory. + javaStringObjectInspector, objectInspector). + convert(partSpec.get(key)); + } + partitionValues[i] = objectValue; } - - colsToInclude = ColumnProjectionUtils.getReadColumnIDs(hiveConf); } - + /** * Creates a Vectorized row batch and the column vectors. * * @return VectorizedRowBatch * @throws HiveException */ - public VectorizedRowBatch createVectorizedRowBatch() throws HiveException + public VectorizedRowBatch createVectorizedRowBatch() { - List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs(); - VectorizedRowBatch result = new VectorizedRowBatch(fieldRefs.size()); - for (int j = 0; j < fieldRefs.size(); j++) { - // If the column is included in the include list or if the column is a - // partition column then create the column vector. Also note that partition columns are not - // in the included list. - if ((colsToInclude == null) || colsToInclude.contains(j) - || ((partitionValues != null) && - partitionValues.containsKey(fieldRefs.get(j).getFieldName()))) { - ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector(); - switch (foi.getCategory()) { - case PRIMITIVE: { - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; - // Vectorization currently only supports the following data types: - // BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, BINARY, STRING, CHAR, VARCHAR, TIMESTAMP, - // DATE and DECIMAL - switch (poi.getPrimitiveCategory()) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - case TIMESTAMP: - case DATE: - case INTERVAL_YEAR_MONTH: - case INTERVAL_DAY_TIME: - result.cols[j] = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); - break; - case FLOAT: - case DOUBLE: - result.cols[j] = new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE); - break; - case BINARY: - case STRING: - case CHAR: - case VARCHAR: - result.cols[j] = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); - break; - case DECIMAL: - DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo(); - result.cols[j] = new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, - tInfo.precision(), tInfo.scale()); - break; - default: - throw new RuntimeException("Vectorizaton is not supported for datatype:" - + poi.getPrimitiveCategory()); - } - break; - } - case LIST: - case MAP: - case STRUCT: - case UNION: - throw new HiveException("Vectorizaton is not supported for datatype:" - + foi.getCategory()); - default: - throw new HiveException("Unknown ObjectInspector category!"); - } - } + int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length; + VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount); + + LOG.info("createVectorizedRowBatch columnsToIncludeTruncated NONE"); + for (int i = 0; i < rowColumnTypeInfos.length; i++) { + TypeInfo typeInfo = rowColumnTypeInfos[i]; + result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo); + } + + for (int i = 0; i < scratchColumnTypeNames.length; i++) { + String typeName = scratchColumnTypeNames[i]; + result.cols[rowColumnTypeInfos.length + i] = + VectorizedBatchUtil.createColumnVector(typeName); } - result.numCols = fieldRefs.size(); - this.addScratchColumnsToBatch(result); + + result.setPartitionInfo(dataColumnCount, partitionColumnCount); + result.reset(); return result; } - /** - * Adds the row to the batch after deserializing the row - * - * @param rowIndex - * Row index in the batch to which the row is added - * @param rowBlob - * Row blob (serialized version of row) - * @param batch - * Vectorized batch to which the row is added - * @param buffer a buffer to copy strings into - * @throws HiveException - * @throws SerDeException - */ - public void addRowToBatch(int rowIndex, Writable rowBlob, - VectorizedRowBatch batch, - DataOutputBuffer buffer - ) throws HiveException, SerDeException + public VectorizedRowBatch createVectorizedRowBatch(boolean[] columnsToIncludeTruncated) { - Object row = this.deserializer.deserialize(rowBlob); - VectorizedBatchUtil.addRowToBatch(row, this.rawRowOI, rowIndex, batch, buffer); - } + if (columnsToIncludeTruncated == null) { + return createVectorizedRowBatch(); + } - /** - * Deserialized set of rows and populates the batch - * - * @param rowBlob - * to deserialize - * @param batch - * Vectorized row batch which contains deserialized data - * @throws SerDeException - */ - public void convertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob, - VectorizedRowBatch batch) - throws SerDeException { - - if (deserializer instanceof VectorizedSerde) { - ((VectorizedSerde) deserializer).deserializeVector(rowBlob, rowsInBlob, batch); - } else { - throw new SerDeException( - "Not able to deserialize row batch. Serde does not implement VectorizedSerde"); + LOG.info("createVectorizedRowBatch columnsToIncludeTruncated " + Arrays.toString(columnsToIncludeTruncated)); + int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length; + VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount); + + for (int i = 0; i < columnsToIncludeTruncated.length; i++) { + if (columnsToIncludeTruncated[i]) { + TypeInfo typeInfo = rowColumnTypeInfos[i]; + result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo); + } } + + for (int i = dataColumnCount; i < dataColumnCount + partitionColumnCount; i++) { + TypeInfo typeInfo = rowColumnTypeInfos[i]; + result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo); + } + + for (int i = 0; i < scratchColumnTypeNames.length; i++) { + String typeName = scratchColumnTypeNames[i]; + result.cols[rowColumnTypeInfos.length + i] = + VectorizedBatchUtil.createColumnVector(typeName); + } + + result.setPartitionInfo(dataColumnCount, partitionColumnCount); + + result.reset(); + return result; } - private int getColIndexBasedOnColName(String colName) throws HiveException - { - List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs(); - for (int i = 0; i < fieldRefs.size(); i++) { - if (fieldRefs.get(i).getFieldName().equals(colName)) { - return i; + public boolean[] getColumnsToIncludeTruncated(Configuration conf) { + boolean[] columnsToIncludeTruncated = null; + + List<Integer> columnsToIncludeTruncatedList = ColumnProjectionUtils.getReadColumnIDs(conf); + if (columnsToIncludeTruncatedList != null && columnsToIncludeTruncatedList.size() > 0 ) { + + // Partitioned columns will not be in the include list. + + boolean[] columnsToInclude = new boolean[dataColumnCount]; + Arrays.fill(columnsToInclude, false); + for (int columnNum : columnsToIncludeTruncatedList) { + if (columnNum < dataColumnCount) { + columnsToInclude[columnNum] = true; + } + } + + // Work backwards to find the highest wanted column. + + int highestWantedColumnNum = -1; + for (int i = dataColumnCount - 1; i >= 0; i--) { + if (columnsToInclude[i]) { + highestWantedColumnNum = i; + break; + } + } + if (highestWantedColumnNum == -1) { + throw new RuntimeException("No columns to include?"); + } + int newColumnCount = highestWantedColumnNum + 1; + if (newColumnCount == dataColumnCount) { + // Didn't trim any columns off the end. Use the original. + columnsToIncludeTruncated = columnsToInclude; + } else { + columnsToIncludeTruncated = Arrays.copyOf(columnsToInclude, newColumnCount); } } - throw new HiveException("Not able to find column name in row object inspector"); + return columnsToIncludeTruncated; } - + /** * Add the partition values to the batch * * @param batch + * @param partitionValues * @throws HiveException */ - public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException + public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partitionValues) { - int colIndex; - Object value; - PrimitiveCategory pCategory; if (partitionValues != null) { - for (String key : partitionValues.keySet()) { - colIndex = getColIndexBasedOnColName(key); - value = partitionValues.get(key); - pCategory = partitionTypes.get(key); - - switch (pCategory) { + for (int i = 0; i < partitionColumnCount; i++) { + Object value = partitionValues[i]; + + int colIndex = dataColumnCount + i; + String partitionColumnName = rowColumnNames[colIndex]; + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) rowColumnTypeInfos[colIndex]; + switch (primitiveTypeInfo.getPrimitiveCategory()) { case BOOLEAN: { LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex]; if (value == null) { @@ -575,7 +447,7 @@ public class VectorizedRowBatchCtx { HiveDecimal hd = (HiveDecimal) value; dv.set(0, hd); dv.isRepeating = true; - dv.isNull[0] = false; + dv.isNull[0] = false; } } break; @@ -604,15 +476,15 @@ public class VectorizedRowBatchCtx { bcv.isNull[0] = true; bcv.isRepeating = true; } else { - bcv.fill(sVal.getBytes()); + bcv.fill(sVal.getBytes()); bcv.isNull[0] = false; } } break; - + default: - throw new HiveException("Unable to recognize the partition type " + pCategory + - " for column " + key); + throw new RuntimeException("Unable to recognize the partition type " + primitiveTypeInfo.getPrimitiveCategory() + + " for column " + partitionColumnName); } } } @@ -620,64 +492,12 @@ public class VectorizedRowBatchCtx { /** * Determine whether a given column is a partition column - * @param colnum column number in + * @param colNum column number in * {@link org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch}s created by this context. * @return true if it is a partition column, false otherwise */ - public final boolean isPartitionCol(int colnum) { - return (partitionCols == null) ? false : partitionCols.contains(colnum); - } - - private void addScratchColumnsToBatch(VectorizedRowBatch vrb) throws HiveException { - if (scratchColumnTypeMap != null && !scratchColumnTypeMap.isEmpty()) { - int origNumCols = vrb.numCols; - int newNumCols = vrb.cols.length+scratchColumnTypeMap.keySet().size(); - vrb.cols = Arrays.copyOf(vrb.cols, newNumCols); - for (int i = origNumCols; i < newNumCols; i++) { - String typeName = scratchColumnTypeMap.get(i); - if (typeName == null) { - throw new HiveException("No type entry found for column " + i + " in map " + scratchColumnTypeMap.toString()); - } - vrb.cols[i] = allocateColumnVector(typeName, - VectorizedRowBatch.DEFAULT_SIZE); - } - vrb.numCols = vrb.cols.length; - } - } - - /** - * Get the scale and precision for the given decimal type string. The decimal type is assumed to be - * of the format decimal(precision,scale) e.g. decimal(20,10). - * @param decimalType The given decimal type string. - * @return An integer array of size 2 with first element set to precision and second set to scale. - */ - private static int[] getScalePrecisionFromDecimalType(String decimalType) { - Pattern p = Pattern.compile("\\d+"); - Matcher m = p.matcher(decimalType); - m.find(); - int precision = Integer.parseInt(m.group()); - m.find(); - int scale = Integer.parseInt(m.group()); - int [] precScale = { precision, scale }; - return precScale; + public final boolean isPartitionCol(int colNum) { + return colNum >= dataColumnCount && colNum < rowColumnTypeInfos.length; } - public static ColumnVector allocateColumnVector(String type, int defaultSize) { - if (type.equalsIgnoreCase("double")) { - return new DoubleColumnVector(defaultSize); - } else if (VectorizationContext.isStringFamily(type)) { - return new BytesColumnVector(defaultSize); - } else if (VectorizationContext.decimalTypePattern.matcher(type).matches()){ - int [] precisionScale = getScalePrecisionFromDecimalType(type); - return new DecimalColumnVector(defaultSize, precisionScale[0], precisionScale[1]); - } else if (type.equalsIgnoreCase("long") || - type.equalsIgnoreCase("date") || - type.equalsIgnoreCase("timestamp") || - type.equalsIgnoreCase(serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME) || - type.equalsIgnoreCase(serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME)) { - return new LongColumnVector(defaultSize); - } else { - throw new RuntimeException("Cannot allocate vector column for " + type); - } - } } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java index 35e3403..f28d3ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Future; -import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,12 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.HashTableLoaderFactory; import org.apache.hadoop.hive.ql.exec.HashTableLoader; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnMapping; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping; @@ -58,7 +52,6 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.optimized.VectorMapJoinOpti import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader; -import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -70,6 +63,8 @@ import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; /** * This class is common operator class for native vectorized map join. @@ -572,10 +567,11 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem * Create our vectorized copy row and deserialize row helper objects. */ if (smallTableMapping.getCount() > 0) { - smallTableVectorDeserializeRow = new VectorDeserializeRow( - new LazyBinaryDeserializeRead( - VectorizedBatchUtil.primitiveTypeInfosFromTypeNames( - smallTableMapping.getTypeNames()))); + smallTableVectorDeserializeRow = + new VectorDeserializeRow( + new LazyBinaryDeserializeRead( + VectorizedBatchUtil.typeInfosFromTypeNames( + smallTableMapping.getTypeNames()))); smallTableVectorDeserializeRow.init(smallTableMapping.getOutputColumns()); } @@ -649,23 +645,13 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem * Setup our 2nd batch with the same "column schema" as the big table batch that can be used to * build join output results in. */ - protected VectorizedRowBatch setupOverflowBatch() { + protected VectorizedRowBatch setupOverflowBatch() throws HiveException { + + int initialColumnCount = vContext.firstOutputColumnIndex(); VectorizedRowBatch overflowBatch; - Map<Integer, String> scratchColumnTypeMap = vOutContext.getScratchColumnTypeMap(); - int maxColumn = 0; - for (int i = 0; i < outputProjection.length; i++) { - int outputColumn = outputProjection[i]; - if (maxColumn < outputColumn) { - maxColumn = outputColumn; - } - } - for (int outputColumn : scratchColumnTypeMap.keySet()) { - if (maxColumn < outputColumn) { - maxColumn = outputColumn; - } - } - overflowBatch = new VectorizedRowBatch(maxColumn + 1); + int totalNumColumns = initialColumnCount + vOutContext.getScratchColumnTypeNames().length; + overflowBatch = new VectorizedRowBatch(totalNumColumns); // First, just allocate just the projection columns we will be using. for (int i = 0; i < outputProjection.length; i++) { @@ -675,9 +661,9 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem } // Now, add any scratch columns needed for children operators. - for (int outputColumn : scratchColumnTypeMap.keySet()) { - String typeName = scratchColumnTypeMap.get(outputColumn); - allocateOverflowBatchColumnVector(overflowBatch, outputColumn, typeName); + int outputColumn = initialColumnCount; + for (String typeName : vOutContext.getScratchColumnTypeNames()) { + allocateOverflowBatchColumnVector(overflowBatch, outputColumn++, typeName); } overflowBatch.projectedColumns = outputProjection; @@ -695,22 +681,13 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem String typeName) { if (overflowBatch.cols[outputColumn] == null) { - String vectorTypeName; - if (VectorizationContext.isIntFamily(typeName) || - VectorizationContext.isDatetimeFamily(typeName)) { - vectorTypeName = "long"; - } else if (VectorizationContext.isFloatFamily(typeName)) { - vectorTypeName = "double"; - } else if (VectorizationContext.isStringFamily(typeName)) { - vectorTypeName = "string"; - } else if (VectorizationContext.decimalTypePattern.matcher(typeName).matches()){ - vectorTypeName = typeName; // Keep precision and scale. - } else { - throw new RuntimeException("Cannot determine vector type for " + typeName); - } - overflowBatch.cols[outputColumn] = VectorizedRowBatchCtx.allocateColumnVector(vectorTypeName, VectorizedRowBatch.DEFAULT_SIZE); + typeName = VectorizationContext.mapTypeNameSynonyms(typeName); - if (LOG.isDebugEnabled()) { + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName); + + overflowBatch.cols[outputColumn] = VectorizedBatchUtil.createColumnVector(typeInfo); + + if (isLogDebugEnabled) { LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator initializeOp overflowBatch outputColumn " + outputColumn + " class " + overflowBatch.cols[outputColumn].getClass().getSimpleName()); } }
