http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index ecd9b14..22f61ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -30,6 +30,9 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -50,6 +53,7 @@ import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.TextReaderShim; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.FloatWritable; @@ -62,10 +66,64 @@ import org.apache.hadoop.io.Text; */ public class TreeReaderFactory { - protected abstract static class TreeReader { + private static final Log LOG = LogFactory.getLog(TreeReaderFactory.class); + + public static class TreeReaderSchema { + + /** + * The types in the ORC file. + */ + List<OrcProto.Type> fileTypes; + + /** + * The treeReaderSchema that the reader should read as. + */ + List<OrcProto.Type> schemaTypes; + + /** + * The subtype of the row STRUCT. Different than 0 for ACID. + */ + int innerStructSubtype; + + public TreeReaderSchema() { + fileTypes = null; + schemaTypes = null; + innerStructSubtype = -1; + } + + public TreeReaderSchema fileTypes(List<OrcProto.Type> fileTypes) { + this.fileTypes = fileTypes; + return this; + } + + public TreeReaderSchema schemaTypes(List<OrcProto.Type> schemaTypes) { + this.schemaTypes = schemaTypes; + return this; + } + + public TreeReaderSchema innerStructSubtype(int innerStructSubtype) { + this.innerStructSubtype = innerStructSubtype; + return this; + } + + public List<OrcProto.Type> getFileTypes() { + return fileTypes; + } + + public List<OrcProto.Type> getSchemaTypes() { + return schemaTypes; + } + + public int getInnerStructSubtype() { + return innerStructSubtype; + } + } + + public abstract static class TreeReader { protected final int columnId; protected BitFieldReader present = null; protected boolean valuePresent = false; + protected int vectorColumnCount; TreeReader(int columnId) throws IOException { this(columnId, null); @@ -79,6 +137,11 @@ public class TreeReaderFactory { } else { present = new BitFieldReader(in, 1); } + vectorColumnCount = -1; + } + + void setVectorColumnCount(int vectorColumnCount) { + this.vectorColumnCount = vectorColumnCount; } void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { @@ -1947,24 +2010,56 @@ public class TreeReaderFactory { } protected static class StructTreeReader extends TreeReader { + private final int fileColumnCount; + private final int resultColumnCount; protected final TreeReader[] fields; private final String[] fieldNames; - StructTreeReader(int columnId, - List<OrcProto.Type> types, + protected StructTreeReader( + int columnId, + TreeReaderSchema treeReaderSchema, boolean[] included, boolean skipCorrupt) throws IOException { super(columnId); - OrcProto.Type type = types.get(columnId); - int fieldCount = type.getFieldNamesCount(); - this.fields = new TreeReader[fieldCount]; - this.fieldNames = new String[fieldCount]; - for (int i = 0; i < fieldCount; ++i) { - int subtype = type.getSubtypes(i); - if (included == null || included[subtype]) { - this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt); + + OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId); + fileColumnCount = fileStructType.getFieldNamesCount(); + + OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId); + + if (columnId == treeReaderSchema.getInnerStructSubtype()) { + // If there are more result columns than reader columns, we will default those additional + // columns to NULL. + resultColumnCount = schemaStructType.getFieldNamesCount(); + } else { + resultColumnCount = fileColumnCount; + } + + this.fields = new TreeReader[fileColumnCount]; + this.fieldNames = new String[fileColumnCount]; + + if (included == null) { + for (int i = 0; i < fileColumnCount; ++i) { + int subtype = schemaStructType.getSubtypes(i); + this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); + // Use the treeReaderSchema evolution name since file/reader types may not have the real column name. + this.fieldNames[i] = schemaStructType.getFieldNames(i); + } + } else { + for (int i = 0; i < fileColumnCount; ++i) { + int subtype = schemaStructType.getSubtypes(i); + if (subtype >= included.length) { + throw new IOException("subtype " + subtype + " exceeds the included array size " + + included.length + " fileTypes " + treeReaderSchema.getFileTypes().toString() + + " schemaTypes " + treeReaderSchema.getSchemaTypes().toString() + + " innerStructSubtype " + treeReaderSchema.getInnerStructSubtype()); + } + if (included[subtype]) { + this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); + } + // Use the treeReaderSchema evolution name since file/reader types may not have the real column name. + this.fieldNames[i] = schemaStructType.getFieldNames(i); } - this.fieldNames[i] = type.getFieldNames(i); } } @@ -1984,22 +2079,28 @@ public class TreeReaderFactory { OrcStruct result = null; if (valuePresent) { if (previous == null) { - result = new OrcStruct(fields.length); + result = new OrcStruct(resultColumnCount); } else { result = (OrcStruct) previous; // If the input format was initialized with a file with a // different number of fields, the number of fields needs to // be updated to the correct number - if (result.getNumFields() != fields.length) { - result.setNumFields(fields.length); + if (result.getNumFields() != resultColumnCount) { + result.setNumFields(resultColumnCount); } } - for (int i = 0; i < fields.length; ++i) { + for (int i = 0; i < fileColumnCount; ++i) { if (fields[i] != null) { result.setFieldValue(i, fields[i].next(result.getFieldValue(i))); } } + if (resultColumnCount > fileColumnCount) { + for (int i = fileColumnCount; i < resultColumnCount; ++i) { + // Default new treeReaderSchema evolution fields to NULL. + result.setFieldValue(i, null); + } + } } return result; } @@ -2008,13 +2109,13 @@ public class TreeReaderFactory { public Object nextVector(Object previousVector, long batchSize) throws IOException { final ColumnVector[] result; if (previousVector == null) { - result = new ColumnVector[fields.length]; + result = new ColumnVector[fileColumnCount]; } else { result = (ColumnVector[]) previousVector; } // Read all the members of struct as column vectors - for (int i = 0; i < fields.length; i++) { + for (int i = 0; i < fileColumnCount; i++) { if (fields[i] != null) { if (result[i] == null) { result[i] = (ColumnVector) fields[i].nextVector(null, batchSize); @@ -2023,6 +2124,19 @@ public class TreeReaderFactory { } } } + + // Default additional treeReaderSchema evolution fields to NULL. + if (vectorColumnCount != -1 && vectorColumnCount > fileColumnCount) { + for (int i = fileColumnCount; i < vectorColumnCount; ++i) { + ColumnVector colVector = result[i]; + if (colVector != null) { + colVector.isRepeating = true; + colVector.noNulls = false; + colVector.isNull[0] = true; + } + } + } + return result; } @@ -2053,18 +2167,18 @@ public class TreeReaderFactory { protected final TreeReader[] fields; protected RunLengthByteReader tags; - UnionTreeReader(int columnId, - List<OrcProto.Type> types, + protected UnionTreeReader(int columnId, + TreeReaderSchema treeReaderSchema, boolean[] included, boolean skipCorrupt) throws IOException { super(columnId); - OrcProto.Type type = types.get(columnId); + OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); int fieldCount = type.getSubtypesCount(); this.fields = new TreeReader[fieldCount]; for (int i = 0; i < fieldCount; ++i) { int subtype = type.getSubtypes(i); if (included == null || included[subtype]) { - this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt); + this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); } } } @@ -2133,13 +2247,13 @@ public class TreeReaderFactory { protected final TreeReader elementReader; protected IntegerReader lengths = null; - ListTreeReader(int columnId, - List<OrcProto.Type> types, + protected ListTreeReader(int columnId, + TreeReaderSchema treeReaderSchema, boolean[] included, boolean skipCorrupt) throws IOException { super(columnId); - OrcProto.Type type = types.get(columnId); - elementReader = createTreeReader(type.getSubtypes(0), types, included, skipCorrupt); + OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); + elementReader = createTreeReader(type.getSubtypes(0), treeReaderSchema, included, skipCorrupt); } @Override @@ -2223,21 +2337,21 @@ public class TreeReaderFactory { protected final TreeReader valueReader; protected IntegerReader lengths = null; - MapTreeReader(int columnId, - List<OrcProto.Type> types, + protected MapTreeReader(int columnId, + TreeReaderSchema treeReaderSchema, boolean[] included, boolean skipCorrupt) throws IOException { super(columnId); - OrcProto.Type type = types.get(columnId); + OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); int keyColumn = type.getSubtypes(0); int valueColumn = type.getSubtypes(1); if (included == null || included[keyColumn]) { - keyReader = createTreeReader(keyColumn, types, included, skipCorrupt); + keyReader = createTreeReader(keyColumn, treeReaderSchema, included, skipCorrupt); } else { keyReader = null; } if (included == null || included[valueColumn]) { - valueReader = createTreeReader(valueColumn, types, included, skipCorrupt); + valueReader = createTreeReader(valueColumn, treeReaderSchema, included, skipCorrupt); } else { valueReader = null; } @@ -2317,11 +2431,11 @@ public class TreeReaderFactory { } public static TreeReader createTreeReader(int columnId, - List<OrcProto.Type> types, + TreeReaderSchema treeReaderSchema, boolean[] included, boolean skipCorrupt ) throws IOException { - OrcProto.Type type = types.get(columnId); + OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); switch (type.getKind()) { case BOOLEAN: return new BooleanTreeReader(columnId); @@ -2361,13 +2475,13 @@ public class TreeReaderFactory { int scale = type.hasScale() ? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE; return new DecimalTreeReader(columnId, precision, scale); case STRUCT: - return new StructTreeReader(columnId, types, included, skipCorrupt); + return new StructTreeReader(columnId, treeReaderSchema, included, skipCorrupt); case LIST: - return new ListTreeReader(columnId, types, included, skipCorrupt); + return new ListTreeReader(columnId, treeReaderSchema, included, skipCorrupt); case MAP: - return new MapTreeReader(columnId, types, included, skipCorrupt); + return new MapTreeReader(columnId, treeReaderSchema, included, skipCorrupt); case UNION: - return new UnionTreeReader(columnId, types, included, skipCorrupt); + return new UnionTreeReader(columnId, treeReaderSchema, included, skipCorrupt); default: throw new IllegalArgumentException("Unsupported type " + type.getKind());
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java new file mode 100644 index 0000000..3c0d590 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java @@ -0,0 +1,514 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * This is the description of the types in an ORC file. + */ +public class TypeDescription { + private static final int MAX_PRECISION = 38; + private static final int MAX_SCALE = 38; + private static final int DEFAULT_PRECISION = 38; + private static final int DEFAULT_SCALE = 10; + private static final int DEFAULT_LENGTH = 256; + public enum Category { + BOOLEAN("boolean", true), + BYTE("tinyint", true), + SHORT("smallint", true), + INT("int", true), + LONG("bigint", true), + FLOAT("float", true), + DOUBLE("double", true), + STRING("string", true), + DATE("date", true), + TIMESTAMP("timestamp", true), + BINARY("binary", true), + DECIMAL("decimal", true), + VARCHAR("varchar", true), + CHAR("char", true), + LIST("array", false), + MAP("map", false), + STRUCT("struct", false), + UNION("union", false); + + Category(String name, boolean isPrimitive) { + this.name = name; + this.isPrimitive = isPrimitive; + } + + final boolean isPrimitive; + final String name; + + public boolean isPrimitive() { + return isPrimitive; + } + + public String getName() { + return name; + } + } + + public static TypeDescription createBoolean() { + return new TypeDescription(Category.BOOLEAN); + } + + public static TypeDescription createByte() { + return new TypeDescription(Category.BYTE); + } + + public static TypeDescription createShort() { + return new TypeDescription(Category.SHORT); + } + + public static TypeDescription createInt() { + return new TypeDescription(Category.INT); + } + + public static TypeDescription createLong() { + return new TypeDescription(Category.LONG); + } + + public static TypeDescription createFloat() { + return new TypeDescription(Category.FLOAT); + } + + public static TypeDescription createDouble() { + return new TypeDescription(Category.DOUBLE); + } + + public static TypeDescription createString() { + return new TypeDescription(Category.STRING); + } + + public static TypeDescription createDate() { + return new TypeDescription(Category.DATE); + } + + public static TypeDescription createTimestamp() { + return new TypeDescription(Category.TIMESTAMP); + } + + public static TypeDescription createBinary() { + return new TypeDescription(Category.BINARY); + } + + public static TypeDescription createDecimal() { + return new TypeDescription(Category.DECIMAL); + } + + /** + * For decimal types, set the precision. + * @param precision the new precision + * @return this + */ + public TypeDescription withPrecision(int precision) { + if (category != Category.DECIMAL) { + throw new IllegalArgumentException("precision is only allowed on decimal"+ + " and not " + category.name); + } else if (precision < 1 || precision > MAX_PRECISION || scale > precision){ + throw new IllegalArgumentException("precision " + precision + + " is out of range 1 .. " + scale); + } + this.precision = precision; + return this; + } + + /** + * For decimal types, set the scale. + * @param scale the new scale + * @return this + */ + public TypeDescription withScale(int scale) { + if (category != Category.DECIMAL) { + throw new IllegalArgumentException("scale is only allowed on decimal"+ + " and not " + category.name); + } else if (scale < 0 || scale > MAX_SCALE || scale > precision) { + throw new IllegalArgumentException("scale is out of range at " + scale); + } + this.scale = scale; + return this; + } + + public static TypeDescription createVarchar() { + return new TypeDescription(Category.VARCHAR); + } + + public static TypeDescription createChar() { + return new TypeDescription(Category.CHAR); + } + + /** + * Set the maximum length for char and varchar types. + * @param maxLength the maximum value + * @return this + */ + public TypeDescription withMaxLength(int maxLength) { + if (category != Category.VARCHAR && category != Category.CHAR) { + throw new IllegalArgumentException("maxLength is only allowed on char" + + " and varchar and not " + category.name); + } + this.maxLength = maxLength; + return this; + } + + public static TypeDescription createList(TypeDescription childType) { + TypeDescription result = new TypeDescription(Category.LIST); + result.children.add(childType); + childType.parent = result; + return result; + } + + public static TypeDescription createMap(TypeDescription keyType, + TypeDescription valueType) { + TypeDescription result = new TypeDescription(Category.MAP); + result.children.add(keyType); + result.children.add(valueType); + keyType.parent = result; + valueType.parent = result; + return result; + } + + public static TypeDescription createUnion() { + return new TypeDescription(Category.UNION); + } + + public static TypeDescription createStruct() { + return new TypeDescription(Category.STRUCT); + } + + /** + * Add a child to a union type. + * @param child a new child type to add + * @return the union type. + */ + public TypeDescription addUnionChild(TypeDescription child) { + if (category != Category.UNION) { + throw new IllegalArgumentException("Can only add types to union type" + + " and not " + category); + } + children.add(child); + child.parent = this; + return this; + } + + /** + * Add a field to a struct type as it is built. + * @param field the field name + * @param fieldType the type of the field + * @return the struct type + */ + public TypeDescription addField(String field, TypeDescription fieldType) { + if (category != Category.STRUCT) { + throw new IllegalArgumentException("Can only add fields to struct type" + + " and not " + category); + } + fieldNames.add(field); + children.add(fieldType); + fieldType.parent = this; + return this; + } + + /** + * Get the id for this type. + * The first call will cause all of the the ids in tree to be assigned, so + * it should not be called before the type is completely built. + * @return the sequential id + */ + public int getId() { + // if the id hasn't been assigned, assign all of the ids from the root + if (id == -1) { + TypeDescription root = this; + while (root.parent != null) { + root = root.parent; + } + root.assignIds(0); + } + return id; + } + + /** + * Get the maximum id assigned to this type or its children. + * The first call will cause all of the the ids in tree to be assigned, so + * it should not be called before the type is completely built. + * @return the maximum id assigned under this type + */ + public int getMaximumId() { + // if the id hasn't been assigned, assign all of the ids from the root + if (maxId == -1) { + TypeDescription root = this; + while (root.parent != null) { + root = root.parent; + } + root.assignIds(0); + } + return maxId; + } + + private ColumnVector createColumn() { + switch (category) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case TIMESTAMP: + case DATE: + return new LongColumnVector(); + case FLOAT: + case DOUBLE: + return new DoubleColumnVector(); + case DECIMAL: + return new DecimalColumnVector(precision, scale); + case STRING: + case BINARY: + case CHAR: + case VARCHAR: + return new BytesColumnVector(); + default: + throw new IllegalArgumentException("Unknown type " + category); + } + } + + public VectorizedRowBatch createRowBatch() { + VectorizedRowBatch result; + if (category == Category.STRUCT) { + result = new VectorizedRowBatch(children.size(), + VectorizedRowBatch.DEFAULT_SIZE); + for(int i=0; i < result.cols.length; ++i) { + result.cols[i] = children.get(i).createColumn(); + } + } else { + result = new VectorizedRowBatch(1, VectorizedRowBatch.DEFAULT_SIZE); + result.cols[0] = createColumn(); + } + result.reset(); + return result; + } + + /** + * Get the kind of this type. + * @return get the category for this type. + */ + public Category getCategory() { + return category; + } + + /** + * Get the maximum length of the type. Only used for char and varchar types. + * @return the maximum length of the string type + */ + public int getMaxLength() { + return maxLength; + } + + /** + * Get the precision of the decimal type. + * @return the number of digits for the precision. + */ + public int getPrecision() { + return precision; + } + + /** + * Get the scale of the decimal type. + * @return the number of digits for the scale. + */ + public int getScale() { + return scale; + } + + /** + * For struct types, get the list of field names. + * @return the list of field names. + */ + public List<String> getFieldNames() { + return Collections.unmodifiableList(fieldNames); + } + + /** + * Get the subtypes of this type. + * @return the list of children types + */ + public List<TypeDescription> getChildren() { + return children == null ? null : Collections.unmodifiableList(children); + } + + /** + * Assign ids to all of the nodes under this one. + * @param startId the lowest id to assign + * @return the next available id + */ + private int assignIds(int startId) { + id = startId++; + if (children != null) { + for (TypeDescription child : children) { + startId = child.assignIds(startId); + } + } + maxId = startId - 1; + return startId; + } + + private TypeDescription(Category category) { + this.category = category; + if (category.isPrimitive) { + children = null; + } else { + children = new ArrayList<>(); + } + if (category == Category.STRUCT) { + fieldNames = new ArrayList<>(); + } else { + fieldNames = null; + } + } + + private int id = -1; + private int maxId = -1; + private TypeDescription parent; + private final Category category; + private final List<TypeDescription> children; + private final List<String> fieldNames; + private int maxLength = DEFAULT_LENGTH; + private int precision = DEFAULT_PRECISION; + private int scale = DEFAULT_SCALE; + + public void printToBuffer(StringBuilder buffer) { + buffer.append(category.name); + switch (category) { + case DECIMAL: + buffer.append('('); + buffer.append(precision); + buffer.append(','); + buffer.append(scale); + buffer.append(')'); + break; + case CHAR: + case VARCHAR: + buffer.append('('); + buffer.append(maxLength); + buffer.append(')'); + break; + case LIST: + case MAP: + case UNION: + buffer.append('<'); + for(int i=0; i < children.size(); ++i) { + if (i != 0) { + buffer.append(','); + } + children.get(i).printToBuffer(buffer); + } + buffer.append('>'); + break; + case STRUCT: + buffer.append('<'); + for(int i=0; i < children.size(); ++i) { + if (i != 0) { + buffer.append(','); + } + buffer.append(fieldNames.get(i)); + buffer.append(':'); + children.get(i).printToBuffer(buffer); + } + buffer.append('>'); + break; + default: + break; + } + } + + public String toString() { + StringBuilder buffer = new StringBuilder(); + printToBuffer(buffer); + return buffer.toString(); + } + + private void printJsonToBuffer(String prefix, StringBuilder buffer, + int indent) { + for(int i=0; i < indent; ++i) { + buffer.append(' '); + } + buffer.append(prefix); + buffer.append("{\"category\": \""); + buffer.append(category.name); + buffer.append("\", \"id\": "); + buffer.append(getId()); + buffer.append(", \"max\": "); + buffer.append(maxId); + switch (category) { + case DECIMAL: + buffer.append(", \"precision\": "); + buffer.append(precision); + buffer.append(", \"scale\": "); + buffer.append(scale); + break; + case CHAR: + case VARCHAR: + buffer.append(", \"length\": "); + buffer.append(maxLength); + break; + case LIST: + case MAP: + case UNION: + buffer.append(", \"children\": ["); + for(int i=0; i < children.size(); ++i) { + buffer.append('\n'); + children.get(i).printJsonToBuffer("", buffer, indent + 2); + if (i != children.size() - 1) { + buffer.append(','); + } + } + buffer.append("]"); + break; + case STRUCT: + buffer.append(", \"fields\": ["); + for(int i=0; i < children.size(); ++i) { + buffer.append('\n'); + children.get(i).printJsonToBuffer("\"" + fieldNames.get(i) + "\": ", + buffer, indent + 2); + if (i != children.size() - 1) { + buffer.append(','); + } + } + buffer.append(']'); + break; + default: + break; + } + buffer.append('}'); + } + + public String toJson() { + StringBuilder buffer = new StringBuilder(); + printJsonToBuffer("", buffer, 0); + return buffer.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java index a8e5c2e..a2725b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.DataOutputBuffer; @@ -46,31 +45,25 @@ class VectorizedOrcAcidRowReader private final AcidInputFormat.RowReader<OrcStruct> innerReader; private final RecordIdentifier key; private final OrcStruct value; - private final VectorizedRowBatchCtx rowBatchCtx; + private VectorizedRowBatchCtx rbCtx; + private Object[] partitionValues; private final ObjectInspector objectInspector; private final DataOutputBuffer buffer = new DataOutputBuffer(); VectorizedOrcAcidRowReader(AcidInputFormat.RowReader<OrcStruct> inner, Configuration conf, + VectorizedRowBatchCtx vectorizedRowBatchCtx, FileSplit split) throws IOException { this.innerReader = inner; this.key = inner.createKey(); - this.rowBatchCtx = new VectorizedRowBatchCtx(); + rbCtx = vectorizedRowBatchCtx; + int partitionColumnCount = rbCtx.getPartitionColumnCount(); + if (partitionColumnCount > 0) { + partitionValues = new Object[partitionColumnCount]; + rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues); + } this.value = inner.createValue(); this.objectInspector = inner.getObjectInspector(); - try { - rowBatchCtx.init(conf, split); - } catch (ClassNotFoundException e) { - throw new IOException("Failed to initialize context", e); - } catch (SerDeException e) { - throw new IOException("Failed to initialize context", e); - } catch (InstantiationException e) { - throw new IOException("Failed to initialize context", e); - } catch (IllegalAccessException e) { - throw new IOException("Failed to initialize context", e); - } catch (HiveException e) { - throw new IOException("Failed to initialize context", e); - } } @Override @@ -82,23 +75,21 @@ class VectorizedOrcAcidRowReader if (!innerReader.next(key, value)) { return false; } - try { - rowBatchCtx.addPartitionColsToBatch(vectorizedRowBatch); - } catch (HiveException e) { - throw new IOException("Problem adding partition column", e); + if (partitionValues != null) { + rbCtx.addPartitionColsToBatch(vectorizedRowBatch, partitionValues); } try { VectorizedBatchUtil.acidAddRowToBatch(value, (StructObjectInspector) objectInspector, - vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer); + vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer); while (vectorizedRowBatch.size < vectorizedRowBatch.selected.length && innerReader.next(key, value)) { VectorizedBatchUtil.acidAddRowToBatch(value, (StructObjectInspector) objectInspector, - vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer); + vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer); } - } catch (HiveException he) { - throw new IOException("error iterating", he); + } catch (Exception e) { + throw new IOException("error iterating", e); } return true; } @@ -110,11 +101,7 @@ class VectorizedOrcAcidRowReader @Override public VectorizedRowBatch createValue() { - try { - return rowBatchCtx.createVectorizedRowBatch(); - } catch (HiveException e) { - throw new RuntimeException("Error creating a batch", e); - } + return rbCtx.createVectorizedRowBatch(); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index bf09001..d90425a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -27,14 +26,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.InputFormatChecker; -import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; @@ -47,7 +44,8 @@ import org.apache.hadoop.mapred.Reporter; * A MapReduce/Hive input format for ORC files. */ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch> - implements InputFormatChecker, VectorizedInputFormatInterface { + implements InputFormatChecker, VectorizedInputFormatInterface, + SelfDescribingInputFormatInterface { static class VectorizedOrcRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> { @@ -56,12 +54,29 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect private final long length; private float progress = 0.0f; private VectorizedRowBatchCtx rbCtx; + private final boolean[] columnsToIncludeTruncated; + private final Object[] partitionValues; private boolean addPartitionCols = true; VectorizedOrcRecordReader(Reader file, Configuration conf, FileSplit fileSplit) throws IOException { + + // if HiveCombineInputFormat gives us FileSplits instead of OrcSplits, + // we know it is not ACID. (see a check in CombineHiveInputFormat.getSplits() that assures this). + // + // Why would an ACID table reach here instead of VectorizedOrcAcidRowReader? + // OrcInputFormat.getRecordReader will use this reader for original files that have no deltas. + // + boolean isAcid = (fileSplit instanceof OrcSplit); + + /** + * Do we have schema on read in the configuration variables? + */ + TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, isAcid); + List<OrcProto.Type> types = file.getTypes(); Reader.Options options = new Reader.Options(); + options.schema(schema); this.offset = fileSplit.getStart(); this.length = fileSplit.getLength(); options.range(offset, length); @@ -69,11 +84,17 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect OrcInputFormat.setSearchArgument(options, types, conf, true); this.reader = file.rowsOptions(options); - try { - rbCtx = new VectorizedRowBatchCtx(); - rbCtx.init(conf, fileSplit); - } catch (Exception e) { - throw new RuntimeException(e); + + rbCtx = Utilities.getVectorizedRowBatchCtx(conf); + + columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(conf); + + int partitionColumnCount = rbCtx.getPartitionColumnCount(); + if (partitionColumnCount > 0) { + partitionValues = new Object[partitionColumnCount]; + rbCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues); + } else { + partitionValues = null; } } @@ -90,7 +111,9 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect // as this does not call CreateValue for each new RecordReader it creates, this check is // required in next() if (addPartitionCols) { - rbCtx.addPartitionColsToBatch(value); + if (partitionValues != null) { + rbCtx.addPartitionColsToBatch(value, partitionValues); + } addPartitionCols = false; } reader.nextBatch(value); @@ -108,11 +131,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect @Override public VectorizedRowBatch createValue() { - try { - return rbCtx.createVectorizedRowBatch(); - } catch (HiveException e) { - throw new RuntimeException("Error creating a batch", e); - } + return rbCtx.createVectorizedRowBatch(columnsToIncludeTruncated); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java index ed99615..c6070c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java @@ -14,8 +14,11 @@ package org.apache.hadoop.hive.ql.io.parquet; import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssign; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssignFactory; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; @@ -23,6 +26,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -32,7 +36,6 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; - import org.apache.parquet.hadoop.ParquetInputFormat; /** @@ -52,6 +55,7 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable, private final ParquetRecordReaderWrapper internalReader; private VectorizedRowBatchCtx rbCtx; + private Object[] partitionValues; private ArrayWritable internalValues; private NullWritable internalKey; private VectorColumnAssign[] assigners; @@ -65,11 +69,11 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable, split, conf, reporter); - try { - rbCtx = new VectorizedRowBatchCtx(); - rbCtx.init(conf, split); - } catch (Exception e) { - throw new RuntimeException(e); + rbCtx = Utilities.getVectorizedRowBatchCtx(conf); + int partitionColumnCount = rbCtx.getPartitionColumnCount(); + if (partitionColumnCount > 0) { + partitionValues = new Object[partitionColumnCount]; + rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues); } } @@ -81,13 +85,9 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable, @Override public VectorizedRowBatch createValue() { - VectorizedRowBatch outputBatch = null; - try { - outputBatch = rbCtx.createVectorizedRowBatch(); - internalValues = internalReader.createValue(); - } catch (HiveException e) { - throw new RuntimeException("Error creating a batch", e); - } + VectorizedRowBatch outputBatch; + outputBatch = rbCtx.createVectorizedRowBatch(); + internalValues = internalReader.createValue(); return outputBatch; } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 5708cb8..40d0e34 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -109,6 +109,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.stats.StatsFactory; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.InputFormat; @@ -700,6 +702,11 @@ public final class GenMapRedUtils { parseCtx.getGlobalLimitCtx().disableOpt(); } + if (topOp instanceof TableScanOperator) { + Utilities.addSchemaEvolutionToTableScanOperator(partsList.getSourceTable(), + (TableScanOperator) topOp); + } + Iterator<Path> iterPath = partDir.iterator(); Iterator<PartitionDesc> iterPartnDesc = partDesc.iterator(); @@ -761,6 +768,7 @@ public final class GenMapRedUtils { * whether you need to add to map-reduce or local work * @param tt_desc * table descriptor + * @throws SerDeException */ public static void setTaskPlan(String path, String alias, Operator<? extends OperatorDesc> topOp, MapWork plan, boolean local, @@ -770,6 +778,16 @@ public final class GenMapRedUtils { return; } + if (topOp instanceof TableScanOperator) { + try { + Utilities.addSchemaEvolutionToTableScanOperator( + (StructObjectInspector) tt_desc.getDeserializer().getObjectInspector(), + (TableScanOperator) topOp); + } catch (Exception e) { + throw new SemanticException(e); + } + } + if (!local) { if (plan.getPathToAliases().get(path) == null) { plan.getPathToAliases().put(path, new ArrayList<String>()); http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java index 2af6f9a..20e1ee6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java @@ -370,6 +370,7 @@ public class SimpleFetchOptimizer implements Transform { private FetchWork convertToWork() throws HiveException { inputs.clear(); + Utilities.addSchemaEvolutionToTableScanOperator(table, scanOp); TableDesc tableDesc = Utilities.getTableDesc(table); if (!table.isPartitioned()) { inputs.add(new ReadEntity(table, parent, !table.isView() && parent == null)); http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 20f9400..5d010cc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.Stack; @@ -33,6 +34,7 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.*; @@ -63,6 +65,11 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -88,6 +95,7 @@ import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorPartitionConversion; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; @@ -100,6 +108,7 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind; +import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.udf.UDFAcos; import org.apache.hadoop.hive.ql.udf.UDFAsin; @@ -149,6 +158,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import com.google.common.base.Joiner; + public class Vectorizer implements PhysicalPlanResolver { protected static transient final Log LOG = LogFactory.getLog(Vectorizer.class); @@ -311,17 +322,51 @@ public class Vectorizer implements PhysicalPlanResolver { supportedAggregationUdfs.add("stddev_samp"); } + private class VectorTaskColumnInfo { + List<String> columnNames; + List<TypeInfo> typeInfos; + int partitionColumnCount; + + String[] scratchTypeNameArray; + + VectorTaskColumnInfo() { + partitionColumnCount = 0; + } + + public void setColumnNames(List<String> columnNames) { + this.columnNames = columnNames; + } + public void setTypeInfos(List<TypeInfo> typeInfos) { + this.typeInfos = typeInfos; + } + public void setPartitionColumnCount(int partitionColumnCount) { + this.partitionColumnCount = partitionColumnCount; + } + public void setScratchTypeNameArray(String[] scratchTypeNameArray) { + this.scratchTypeNameArray = scratchTypeNameArray; + } + + public void transferToBaseWork(BaseWork baseWork) { + + String[] columnNameArray = columnNames.toArray(new String[0]); + TypeInfo[] typeInfoArray = typeInfos.toArray(new TypeInfo[0]); + + VectorizedRowBatchCtx vectorizedRowBatchCtx = + new VectorizedRowBatchCtx( + columnNameArray, + typeInfoArray, + partitionColumnCount, + scratchTypeNameArray); + baseWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx); + } + } + class VectorizationDispatcher implements Dispatcher { private final PhysicalContext physicalContext; - private List<String> reduceColumnNames; - private List<TypeInfo> reduceTypeInfos; - public VectorizationDispatcher(PhysicalContext physicalContext) { this.physicalContext = physicalContext; - reduceColumnNames = null; - reduceTypeInfos = null; } @Override @@ -359,9 +404,10 @@ public class Vectorizer implements PhysicalPlanResolver { } private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException { - boolean ret = validateMapWork(mapWork, isTez); + VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); + boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTez); if (ret) { - vectorizeMapWork(mapWork, isTez); + vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTez); } } @@ -372,40 +418,262 @@ public class Vectorizer implements PhysicalPlanResolver { + ReduceSinkOperator.getOperatorName()), np); } - private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException { - LOG.info("Validating MapWork..."); + private ImmutablePair<String, TableScanOperator> verifyOnlyOneTableScanOperator(MapWork mapWork) { // Eliminate MR plans with more than one TableScanOperator. + LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork(); if ((aliasToWork == null) || (aliasToWork.size() == 0)) { - return false; + return null; } int tableScanCount = 0; - for (Operator<?> op : aliasToWork.values()) { + String alias = ""; + TableScanOperator tableScanOperator = null; + for (Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) { + Operator<?> op = entry.getValue(); if (op == null) { LOG.warn("Map work has invalid aliases to work with. Fail validation!"); - return false; + return null; } if (op instanceof TableScanOperator) { tableScanCount++; + alias = entry.getKey(); + tableScanOperator = (TableScanOperator) op; } } if (tableScanCount > 1) { - LOG.warn("Map work has more than 1 TableScanOperator aliases to work with. Fail validation!"); - return false; + LOG.warn("Map work has more than 1 TableScanOperator. Fail validation!"); + return null; + } + return new ImmutablePair(alias, tableScanOperator); + } + + private void getTableScanOperatorSchemaInfo(TableScanOperator tableScanOperator, + List<String> logicalColumnNameList, List<TypeInfo> logicalTypeInfoList) { + + TableScanDesc tableScanDesc = tableScanOperator.getConf(); + + // Add all non-virtual columns to make a vectorization context for + // the TableScan operator. + RowSchema rowSchema = tableScanOperator.getSchema(); + for (ColumnInfo c : rowSchema.getSignature()) { + // Validation will later exclude vectorization of virtual columns usage (HIVE-5560). + if (!isVirtualColumn(c)) { + String columnName = c.getInternalName(); + String typeName = c.getTypeName(); + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName); + + logicalColumnNameList.add(columnName); + logicalTypeInfoList.add(typeInfo); + } + } + } + + private String getColumns(List<String> columnNames, int start, int length, + Character separator) { + return Joiner.on(separator).join(columnNames.subList(start, start + length)); + } + + private String getTypes(List<TypeInfo> typeInfos, int start, int length) { + return TypeInfoUtils.getTypesString(typeInfos.subList(start, start + length)); + } + + private boolean verifyAndSetVectorPartDesc(PartitionDesc pd) { + + // Look for Pass-Thru case where InputFileFormat has VectorizedInputFormatInterface + // and reads VectorizedRowBatch as a "row". + + if (Utilities.isInputFileFormatVectorized(pd)) { + + pd.setVectorPartitionDesc(VectorPartitionDesc.createVectorizedInputFileFormat()); + + return true; } + LOG.info("Input format: " + pd.getInputFileFormatClassName() + + ", doesn't provide vectorized input"); + + return false; + } + + private boolean validateInputFormatAndSchemaEvolution(MapWork mapWork, String alias, + TableScanOperator tableScanOperator, VectorTaskColumnInfo vectorTaskColumnInfo) { + + // These names/types are the data columns plus partition columns. + final List<String> allColumnNameList = new ArrayList<String>(); + final List<TypeInfo> allTypeInfoList = new ArrayList<TypeInfo>(); + + getTableScanOperatorSchemaInfo(tableScanOperator, allColumnNameList, allTypeInfoList); + final int allColumnCount = allColumnNameList.size(); + + // Validate input format and schema evolution capability. + + // For the table, enter a null value in the multi-key map indicating no conversion necessary + // if the schema matches the table. + + HashMap<ImmutablePair, boolean[]> conversionMap = new HashMap<ImmutablePair, boolean[]>(); + + boolean isFirst = true; + int dataColumnCount = 0; + int partitionColumnCount = 0; + + List<String> dataColumnList = null; + String dataColumnsString = ""; + List<TypeInfo> dataTypeInfoList = null; + // Validate the input format - for (String path : mapWork.getPathToPartitionInfo().keySet()) { - PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path); - List<Class<?>> interfaceList = - Arrays.asList(pd.getInputFileFormatClass().getInterfaces()); - if (!interfaceList.contains(VectorizedInputFormatInterface.class)) { - LOG.info("Input format: " + pd.getInputFileFormatClassName() - + ", doesn't provide vectorized input"); + VectorPartitionConversion partitionConversion = new VectorPartitionConversion(); + LinkedHashMap<String, ArrayList<String>> pathToAliases = mapWork.getPathToAliases(); + LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo(); + for (Entry<String, ArrayList<String>> entry: pathToAliases.entrySet()) { + String path = entry.getKey(); + List<String> aliases = entry.getValue(); + boolean isPresent = (aliases != null && aliases.indexOf(alias) != -1); + if (!isPresent) { + LOG.info("Alias " + alias + " not present in aliases " + aliases); + return false; + } + PartitionDesc partDesc = pathToPartitionInfo.get(path); + if (partDesc.getVectorPartitionDesc() != null) { + // We seen this already. + continue; + } + if (!verifyAndSetVectorPartDesc(partDesc)) { return false; } + VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc(); + LOG.info("Vectorizer path: " + path + ", read type " + + vectorPartDesc.getVectorMapOperatorReadType().name() + ", aliases " + aliases); + + Properties partProps = partDesc.getProperties(); + + String nextDataColumnsString = + partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS); + String[] nextDataColumns = nextDataColumnsString.split(","); + + String nextDataTypesString = + partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES); + + // We convert to an array of TypeInfo using a library routine since it parses the information + // and can handle use of different separators, etc. We cannot use the raw type string + // for comparison in the map because of the different separators used. + List<TypeInfo> nextDataTypeInfoList = + TypeInfoUtils.getTypeInfosFromTypeString(nextDataTypesString); + + if (isFirst) { + + // We establish with the first one whether the table is partitioned or not. + + LinkedHashMap<String, String> partSpec = partDesc.getPartSpec(); + if (partSpec != null && partSpec.size() > 0) { + partitionColumnCount = partSpec.size(); + dataColumnCount = allColumnCount - partitionColumnCount; + } else { + partitionColumnCount = 0; + dataColumnCount = allColumnCount; + } + + dataColumnList = allColumnNameList.subList(0, dataColumnCount); + dataColumnsString = getColumns(allColumnNameList, 0, dataColumnCount, ','); + dataTypeInfoList = allTypeInfoList.subList(0, dataColumnCount); + + // Add the table (non-partitioned) columns and types into the map as not needing + // conversion (i.e. null). + conversionMap.put( + new ImmutablePair(dataColumnsString, dataTypeInfoList), null); + + isFirst = false; + } + + ImmutablePair columnNamesAndTypesCombination = + new ImmutablePair(nextDataColumnsString, nextDataTypeInfoList); + + boolean[] conversionFlags; + if (conversionMap.containsKey(columnNamesAndTypesCombination)) { + + conversionFlags = conversionMap.get(columnNamesAndTypesCombination); + + } else { + + List<String> nextDataColumnList = Arrays.asList(nextDataColumns); + + // Validate the column names that are present are the same. Missing columns will be + // implicitly defaulted to null. + + if (nextDataColumnList.size() > dataColumnList.size()) { + LOG.info( + String.format("Could not vectorize partition %s. The partition column names %d is greater than the number of table columns %d", + path, nextDataColumnList.size(), dataColumnList.size())); + return false; + } + for (int i = 0; i < nextDataColumnList.size(); i++) { + String nextColumnName = nextDataColumnList.get(i); + String tableColumnName = dataColumnList.get(i); + if (!nextColumnName.equals(tableColumnName)) { + LOG.info( + String.format("Could not vectorize partition %s. The partition column name %s is does not match table column name %s", + path, nextColumnName, tableColumnName)); + return false; + } + } + + // The table column types might have been changed with ALTER. There are restrictions + // here for vectorization. + + // Some readers / deserializers take responsibility for conversion themselves. + + // If we need to check for conversion, the conversion object may come back null + // indicating from a vectorization point of view the conversion is implicit. That is, + // all implicit integer upgrades. + + if (vectorPartDesc.getNeedsDataTypeConversionCheck() && + !nextDataTypeInfoList.equals(dataTypeInfoList)) { + + // The results will be in 2 members: validConversion and conversionFlags + partitionConversion.validateConversion(nextDataTypeInfoList, dataTypeInfoList); + if (!partitionConversion.getValidConversion()) { + return false; + } + conversionFlags = partitionConversion.getResultConversionFlags(); + } else { + conversionFlags = null; + } + + // We enter this in our map so we don't have to check again for subsequent partitions. + + conversionMap.put(columnNamesAndTypesCombination, conversionFlags); + } + + vectorPartDesc.setConversionFlags(conversionFlags); + + vectorPartDesc.setTypeInfos(nextDataTypeInfoList); + } + + vectorTaskColumnInfo.setColumnNames(allColumnNameList); + vectorTaskColumnInfo.setTypeInfos(allTypeInfoList); + vectorTaskColumnInfo.setPartitionColumnCount(partitionColumnCount); + + return true; + } + + private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) + throws SemanticException { + + LOG.info("Validating MapWork..."); + + ImmutablePair<String,TableScanOperator> pair = verifyOnlyOneTableScanOperator(mapWork); + if (pair == null) { + return false; + } + String alias = pair.left; + TableScanOperator tableScanOperator = pair.right; + + // This call fills in the column names, types, and partition column count in + // vectorTaskColumnInfo. + if (!validateInputFormatAndSchemaEvolution(mapWork, alias, tableScanOperator, vectorTaskColumnInfo)) { + return false; } + Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTez); addMapWorkRules(opRules, vnp); @@ -427,11 +695,14 @@ public class Vectorizer implements PhysicalPlanResolver { return true; } - private void vectorizeMapWork(MapWork mapWork, boolean isTez) throws SemanticException { + private void vectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, + boolean isTez) throws SemanticException { + LOG.info("Vectorizing MapWork..."); mapWork.setVectorMode(true); Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); - MapWorkVectorizationNodeProcessor vnp = new MapWorkVectorizationNodeProcessor(mapWork, isTez); + MapWorkVectorizationNodeProcessor vnp = + new MapWorkVectorizationNodeProcessor(mapWork, isTez, vectorTaskColumnInfo); addMapWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new PreOrderWalker(disp); @@ -441,9 +712,9 @@ public class Vectorizer implements PhysicalPlanResolver { HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>(); ogw.startWalking(topNodes, nodeOutput); - mapWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap()); - mapWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap()); - mapWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap()); + vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames()); + + vectorTaskColumnInfo.transferToBaseWork(mapWork); if (LOG.isDebugEnabled()) { debugDisplayAllMaps(mapWork); @@ -453,13 +724,19 @@ public class Vectorizer implements PhysicalPlanResolver { } private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException { - boolean ret = validateReduceWork(reduceWork); + VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo(); + boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo, isTez); if (ret) { - vectorizeReduceWork(reduceWork, isTez); + vectorizeReduceWork(reduceWork, vectorTaskColumnInfo, isTez); } } - private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws SemanticException { + private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork, + VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException { + + ArrayList<String> reduceColumnNames = new ArrayList<String>(); + ArrayList<TypeInfo> reduceTypeInfos = new ArrayList<TypeInfo>(); + try { // Check key ObjectInspector. ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector(); @@ -483,9 +760,6 @@ public class Vectorizer implements PhysicalPlanResolver { StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector; List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs(); - reduceColumnNames = new ArrayList<String>(); - reduceTypeInfos = new ArrayList<TypeInfo>(); - for (StructField field: keyFields) { reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName())); @@ -497,6 +771,10 @@ public class Vectorizer implements PhysicalPlanResolver { } catch (Exception e) { throw new SemanticException(e); } + + vectorTaskColumnInfo.setColumnNames(reduceColumnNames); + vectorTaskColumnInfo.setTypeInfos(reduceTypeInfos); + return true; } @@ -505,11 +783,13 @@ public class Vectorizer implements PhysicalPlanResolver { opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + ".*"), np); } - private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException { + private boolean validateReduceWork(ReduceWork reduceWork, + VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException { + LOG.info("Validating ReduceWork..."); // Validate input to ReduceWork. - if (!getOnlyStructObjectInspectors(reduceWork)) { + if (!getOnlyStructObjectInspectors(reduceWork, vectorTaskColumnInfo)) { return false; } // Now check the reduce operator tree. @@ -533,7 +813,9 @@ public class Vectorizer implements PhysicalPlanResolver { return true; } - private void vectorizeReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException { + private void vectorizeReduceWork(ReduceWork reduceWork, + VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException { + LOG.info("Vectorizing ReduceWork..."); reduceWork.setVectorMode(true); @@ -542,7 +824,7 @@ public class Vectorizer implements PhysicalPlanResolver { // VectorizationContext... Do we use PreOrderWalker instead of DefaultGraphWalker. Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>(); ReduceWorkVectorizationNodeProcessor vnp = - new ReduceWorkVectorizationNodeProcessor(reduceColumnNames, reduceTypeInfos, isTez); + new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo, isTez); addReduceWorkRules(opRules, vnp); Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null); GraphWalker ogw = new PreOrderWalker(disp); @@ -557,9 +839,9 @@ public class Vectorizer implements PhysicalPlanResolver { // Necessary since we are vectorizing the root operator in reduce. reduceWork.setReducer(vnp.getRootVectorOp()); - reduceWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap()); - reduceWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap()); - reduceWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap()); + vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames()); + + vectorTaskColumnInfo.transferToBaseWork(reduceWork); if (LOG.isDebugEnabled()) { debugDisplayAllMaps(reduceWork); @@ -627,23 +909,11 @@ public class Vectorizer implements PhysicalPlanResolver { // The vectorization context for the Map or Reduce task. protected VectorizationContext taskVectorizationContext; - // The input projection column type name map for the Map or Reduce task. - protected Map<Integer, String> taskColumnTypeNameMap; - VectorizationNodeProcessor() { - taskColumnTypeNameMap = new HashMap<Integer, String>(); - } - - public Map<String, Integer> getVectorColumnNameMap() { - return taskVectorizationContext.getProjectionColumnMap(); } - public Map<Integer, String> getVectorColumnTypeMap() { - return taskColumnTypeNameMap; - } - - public Map<Integer, String> getVectorScratchColumnTypeMap() { - return taskVectorizationContext.getScratchColumnTypeMap(); + public String[] getVectorScratchColumnTypeNames() { + return taskVectorizationContext.getScratchColumnTypeNames(); } protected final Set<Operator<? extends OperatorDesc>> opsDone = @@ -713,11 +983,14 @@ public class Vectorizer implements PhysicalPlanResolver { class MapWorkVectorizationNodeProcessor extends VectorizationNodeProcessor { private final MapWork mWork; + private VectorTaskColumnInfo vectorTaskColumnInfo; private final boolean isTez; - public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez) { + public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez, + VectorTaskColumnInfo vectorTaskColumnInfo) { super(); this.mWork = mWork; + this.vectorTaskColumnInfo = vectorTaskColumnInfo; this.isTez = isTez; } @@ -731,8 +1004,7 @@ public class Vectorizer implements PhysicalPlanResolver { if (op instanceof TableScanOperator) { if (taskVectorizationContext == null) { - taskVectorizationContext = getVectorizationContext(op.getSchema(), op.getName(), - taskColumnTypeNameMap); + taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo); } vContext = taskVectorizationContext; } else { @@ -773,8 +1045,7 @@ public class Vectorizer implements PhysicalPlanResolver { class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor { - private final List<String> reduceColumnNames; - private final List<TypeInfo> reduceTypeInfos; + private VectorTaskColumnInfo vectorTaskColumnInfo; private boolean isTez; @@ -784,11 +1055,11 @@ public class Vectorizer implements PhysicalPlanResolver { return rootVectorOp; } - public ReduceWorkVectorizationNodeProcessor(List<String> reduceColumnNames, - List<TypeInfo> reduceTypeInfos, boolean isTez) { + public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo, + boolean isTez) { + super(); - this.reduceColumnNames = reduceColumnNames; - this.reduceTypeInfos = reduceTypeInfos; + this.vectorTaskColumnInfo = vectorTaskColumnInfo; rootVectorOp = null; this.isTez = isTez; } @@ -804,15 +1075,11 @@ public class Vectorizer implements PhysicalPlanResolver { boolean saveRootVectorOp = false; if (op.getParentOperators().size() == 0) { - LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + reduceColumnNames.toString()); + LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.columnNames.toString()); - vContext = new VectorizationContext("__Reduce_Shuffle__", reduceColumnNames); + vContext = new VectorizationContext("__Reduce_Shuffle__", vectorTaskColumnInfo.columnNames); taskVectorizationContext = vContext; - int i = 0; - for (TypeInfo typeInfo : reduceTypeInfos) { - taskColumnTypeNameMap.put(i, typeInfo.getTypeName()); - i++; - } + saveRootVectorOp = true; if (LOG.isDebugEnabled()) { @@ -881,7 +1148,7 @@ public class Vectorizer implements PhysicalPlanResolver { @Override public PhysicalContext resolve(PhysicalContext physicalContext) throws SemanticException { - this.physicalContext = physicalContext; + hiveConf = physicalContext.getConf(); boolean vectorPath = HiveConf.getBoolVar(hiveConf, @@ -1022,65 +1289,6 @@ public class Vectorizer implements PhysicalPlanResolver { return false; } - String columns = ""; - String types = ""; - String partitionColumns = ""; - String partitionTypes = ""; - boolean haveInfo = false; - - // This over-reaches slightly, since we can have > 1 table-scan per map-work. - // It needs path to partition, path to alias, then check the alias == the same table-scan, to be accurate. - // That said, that is a TODO item to be fixed when we support >1 TableScans per vectorized pipeline later. - LinkedHashMap<String, PartitionDesc> partitionDescs = mWork.getPathToPartitionInfo(); - - // For vectorization, compare each partition information for against the others. - // We assume the table information will be from one of the partitions, so it will - // work to focus on the partition information and not compare against the TableScanOperator - // columns (in the VectorizationContext).... - for (Map.Entry<String, PartitionDesc> entry : partitionDescs.entrySet()) { - PartitionDesc partDesc = entry.getValue(); - if (partDesc.getPartSpec() == null || partDesc.getPartSpec().isEmpty()) { - // No partition information -- we match because we would default to using the table description. - continue; - } - Properties partProps = partDesc.getProperties(); - if (!haveInfo) { - columns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS); - types = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES); - partitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - partitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); - haveInfo = true; - } else { - String nextColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS); - String nextTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES); - String nextPartitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - String nextPartitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); - if (!columns.equalsIgnoreCase(nextColumns)) { - LOG.info( - String.format("Could not vectorize partition %s. Its column names %s do not match the other column names %s", - entry.getKey(), nextColumns, columns)); - return false; - } - if (!types.equalsIgnoreCase(nextTypes)) { - LOG.info( - String.format("Could not vectorize partition %s. Its column types %s do not match the other column types %s", - entry.getKey(), nextTypes, types)); - return false; - } - if (!partitionColumns.equalsIgnoreCase(nextPartitionColumns)) { - LOG.info( - String.format("Could not vectorize partition %s. Its partition column names %s do not match the other partition column names %s", - entry.getKey(), nextPartitionColumns, partitionColumns)); - return false; - } - if (!partitionTypes.equalsIgnoreCase(nextPartitionTypes)) { - LOG.info( - String.format("Could not vectorize partition %s. Its partition column types %s do not match the other partition column types %s", - entry.getKey(), nextPartitionTypes, partitionTypes)); - return false; - } - } - } return true; } @@ -1412,23 +1620,10 @@ public class Vectorizer implements PhysicalPlanResolver { return result; } - private VectorizationContext getVectorizationContext(RowSchema rowSchema, String contextName, - Map<Integer, String> typeNameMap) { + private VectorizationContext getVectorizationContext(String contextName, + VectorTaskColumnInfo vectorTaskColumnInfo) { - VectorizationContext vContext = new VectorizationContext(contextName); - - // Add all non-virtual columns to make a vectorization context for - // the TableScan operator. - int i = 0; - for (ColumnInfo c : rowSchema.getSignature()) { - // Earlier, validation code should have eliminated virtual columns usage (HIVE-5560). - if (!isVirtualColumn(c)) { - vContext.addInitialColumn(c.getInternalName()); - typeNameMap.put(i, c.getTypeName()); - i++; - } - } - vContext.finishedAddingInitialColumns(); + VectorizationContext vContext = new VectorizationContext(contextName, vectorTaskColumnInfo.columnNames); return vContext; } @@ -1785,12 +1980,16 @@ public class Vectorizer implements PhysicalPlanResolver { public void debugDisplayAllMaps(BaseWork work) { - Map<String, Integer> columnNameMap = work.getVectorColumnNameMap(); - Map<Integer, String> columnTypeMap = work.getVectorColumnTypeMap(); - Map<Integer, String> scratchColumnTypeMap = work.getVectorScratchColumnTypeMap(); + VectorizedRowBatchCtx vectorizedRowBatchCtx = work.getVectorizedRowBatchCtx(); + + String[] columnNames = vectorizedRowBatchCtx.getRowColumnNames(); + Object columnTypeInfos = vectorizedRowBatchCtx.getRowColumnTypeInfos(); + int partitionColumnCount = vectorizedRowBatchCtx.getPartitionColumnCount(); + String[] scratchColumnTypeNames =vectorizedRowBatchCtx.getScratchColumnTypeNames(); - LOG.debug("debugDisplayAllMaps columnNameMap " + columnNameMap.toString()); - LOG.debug("debugDisplayAllMaps columnTypeMap " + columnTypeMap.toString()); - LOG.debug("debugDisplayAllMaps scratchColumnTypeMap " + scratchColumnTypeMap.toString()); + LOG.debug("debugDisplayAllMaps columnNames " + Arrays.toString(columnNames)); + LOG.debug("debugDisplayAllMaps columnTypeInfos " + Arrays.deepToString((Object[]) columnTypeInfos)); + LOG.debug("debugDisplayAllMaps partitionColumnCount " + partitionColumnCount); + LOG.debug("debugDisplayAllMaps scratchColumnTypeNames " + Arrays.toString(scratchColumnTypeNames)); } }
