http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java index b654b64..aabea0b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java @@ -17,11 +17,29 @@ */ package org.apache.hadoop.hive.ql.io.orc; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; + +import com.google.common.collect.Lists; public class OrcUtils { private static final Logger LOG = LoggerFactory.getLogger(OrcUtils.class); @@ -79,4 +97,533 @@ public class OrcUtils { } return null; } + + /** + * Convert a Hive type property string that contains separated type names into a list of + * TypeDescription objects. + * @return the list of TypeDescription objects. + */ + public static ArrayList<TypeDescription> typeDescriptionsFromHiveTypeProperty( + String hiveTypeProperty) { + + // CONSDIER: We need a type name parser for TypeDescription. + + ArrayList<TypeInfo> typeInfoList = TypeInfoUtils.getTypeInfosFromTypeString(hiveTypeProperty); + ArrayList<TypeDescription> typeDescrList =new ArrayList<TypeDescription>(typeInfoList.size()); + for (TypeInfo typeInfo : typeInfoList) { + typeDescrList.add(convertTypeInfo(typeInfo)); + } + return typeDescrList; + } + + public static TypeDescription convertTypeInfo(TypeInfo info) { + switch (info.getCategory()) { + case PRIMITIVE: { + PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info; + switch (pinfo.getPrimitiveCategory()) { + case BOOLEAN: + return TypeDescription.createBoolean(); + case BYTE: + return TypeDescription.createByte(); + case SHORT: + return TypeDescription.createShort(); + case INT: + return TypeDescription.createInt(); + case LONG: + return TypeDescription.createLong(); + case FLOAT: + return TypeDescription.createFloat(); + case DOUBLE: + return TypeDescription.createDouble(); + case STRING: + return TypeDescription.createString(); + case DATE: + return TypeDescription.createDate(); + case TIMESTAMP: + return TypeDescription.createTimestamp(); + case BINARY: + return TypeDescription.createBinary(); + case DECIMAL: { + DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo; + return TypeDescription.createDecimal() + .withScale(dinfo.getScale()) + .withPrecision(dinfo.getPrecision()); + } + case VARCHAR: { + BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo; + return TypeDescription.createVarchar() + .withMaxLength(cinfo.getLength()); + } + case CHAR: { + BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo; + return TypeDescription.createChar() + .withMaxLength(cinfo.getLength()); + } + default: + throw new IllegalArgumentException("ORC doesn't handle primitive" + + " category " + pinfo.getPrimitiveCategory()); + } + } + case LIST: { + ListTypeInfo linfo = (ListTypeInfo) info; + return TypeDescription.createList + (convertTypeInfo(linfo.getListElementTypeInfo())); + } + case MAP: { + MapTypeInfo minfo = (MapTypeInfo) info; + return TypeDescription.createMap + (convertTypeInfo(minfo.getMapKeyTypeInfo()), + convertTypeInfo(minfo.getMapValueTypeInfo())); + } + case UNION: { + UnionTypeInfo minfo = (UnionTypeInfo) info; + TypeDescription result = TypeDescription.createUnion(); + for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) { + result.addUnionChild(convertTypeInfo(child)); + } + return result; + } + case STRUCT: { + StructTypeInfo sinfo = (StructTypeInfo) info; + TypeDescription result = TypeDescription.createStruct(); + for(String fieldName: sinfo.getAllStructFieldNames()) { + result.addField(fieldName, + convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName))); + } + return result; + } + default: + throw new IllegalArgumentException("ORC doesn't handle " + + info.getCategory()); + } + } + + public static List<OrcProto.Type> getOrcTypes(TypeDescription typeDescr) { + List<OrcProto.Type> result = Lists.newArrayList(); + appendOrcTypes(result, typeDescr); + return result; + } + + private static void appendOrcTypes(List<OrcProto.Type> result, TypeDescription typeDescr) { + OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); + List<TypeDescription> children = typeDescr.getChildren(); + switch (typeDescr.getCategory()) { + case BOOLEAN: + type.setKind(OrcProto.Type.Kind.BOOLEAN); + break; + case BYTE: + type.setKind(OrcProto.Type.Kind.BYTE); + break; + case SHORT: + type.setKind(OrcProto.Type.Kind.SHORT); + break; + case INT: + type.setKind(OrcProto.Type.Kind.INT); + break; + case LONG: + type.setKind(OrcProto.Type.Kind.LONG); + break; + case FLOAT: + type.setKind(OrcProto.Type.Kind.FLOAT); + break; + case DOUBLE: + type.setKind(OrcProto.Type.Kind.DOUBLE); + break; + case STRING: + type.setKind(OrcProto.Type.Kind.STRING); + break; + case CHAR: + type.setKind(OrcProto.Type.Kind.CHAR); + type.setMaximumLength(typeDescr.getMaxLength()); + break; + case VARCHAR: + type.setKind(Type.Kind.VARCHAR); + type.setMaximumLength(typeDescr.getMaxLength()); + break; + case BINARY: + type.setKind(OrcProto.Type.Kind.BINARY); + break; + case TIMESTAMP: + type.setKind(OrcProto.Type.Kind.TIMESTAMP); + break; + case DATE: + type.setKind(OrcProto.Type.Kind.DATE); + break; + case DECIMAL: + type.setKind(OrcProto.Type.Kind.DECIMAL); + type.setPrecision(typeDescr.getPrecision()); + type.setScale(typeDescr.getScale()); + break; + case LIST: + type.setKind(OrcProto.Type.Kind.LIST); + type.addSubtypes(children.get(0).getId()); + break; + case MAP: + type.setKind(OrcProto.Type.Kind.MAP); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); + } + break; + case STRUCT: + type.setKind(OrcProto.Type.Kind.STRUCT); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); + } + for(String field: typeDescr.getFieldNames()) { + type.addFieldNames(field); + } + break; + case UNION: + type.setKind(OrcProto.Type.Kind.UNION); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); + } + break; + default: + throw new IllegalArgumentException("Unknown category: " + + typeDescr.getCategory()); + } + result.add(type.build()); + if (children != null) { + for(TypeDescription child: children) { + appendOrcTypes(result, child); + } + } + } + + /** + * NOTE: This method ignores the subtype numbers in the TypeDescription rebuilds the subtype + * numbers based on the length of the result list being appended. + * + * @param result + * @param typeInfo + */ + public static void appendOrcTypesRebuildSubtypes(List<OrcProto.Type> result, + TypeDescription typeDescr) { + + int subtype = result.size(); + OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); + boolean needsAdd = true; + List<TypeDescription> children = typeDescr.getChildren(); + switch (typeDescr.getCategory()) { + case BOOLEAN: + type.setKind(OrcProto.Type.Kind.BOOLEAN); + break; + case BYTE: + type.setKind(OrcProto.Type.Kind.BYTE); + break; + case SHORT: + type.setKind(OrcProto.Type.Kind.SHORT); + break; + case INT: + type.setKind(OrcProto.Type.Kind.INT); + break; + case LONG: + type.setKind(OrcProto.Type.Kind.LONG); + break; + case FLOAT: + type.setKind(OrcProto.Type.Kind.FLOAT); + break; + case DOUBLE: + type.setKind(OrcProto.Type.Kind.DOUBLE); + break; + case STRING: + type.setKind(OrcProto.Type.Kind.STRING); + break; + case CHAR: + type.setKind(OrcProto.Type.Kind.CHAR); + type.setMaximumLength(typeDescr.getMaxLength()); + break; + case VARCHAR: + type.setKind(Type.Kind.VARCHAR); + type.setMaximumLength(typeDescr.getMaxLength()); + break; + case BINARY: + type.setKind(OrcProto.Type.Kind.BINARY); + break; + case TIMESTAMP: + type.setKind(OrcProto.Type.Kind.TIMESTAMP); + break; + case DATE: + type.setKind(OrcProto.Type.Kind.DATE); + break; + case DECIMAL: + type.setKind(OrcProto.Type.Kind.DECIMAL); + type.setPrecision(typeDescr.getPrecision()); + type.setScale(typeDescr.getScale()); + break; + case LIST: + type.setKind(OrcProto.Type.Kind.LIST); + type.addSubtypes(++subtype); + result.add(type.build()); + needsAdd = false; + appendOrcTypesRebuildSubtypes(result, children.get(0)); + break; + case MAP: + { + // Make room for MAP type. + result.add(null); + + // Add MAP type pair in order to determine their subtype values. + appendOrcTypesRebuildSubtypes(result, children.get(0)); + int subtype2 = result.size(); + appendOrcTypesRebuildSubtypes(result, children.get(1)); + type.setKind(OrcProto.Type.Kind.MAP); + type.addSubtypes(subtype + 1); + type.addSubtypes(subtype2); + result.set(subtype, type.build()); + needsAdd = false; + } + break; + case STRUCT: + { + List<String> fieldNames = typeDescr.getFieldNames(); + + // Make room for STRUCT type. + result.add(null); + + List<Integer> fieldSubtypes = new ArrayList<Integer>(fieldNames.size()); + for(TypeDescription child: children) { + int fieldSubtype = result.size(); + fieldSubtypes.add(fieldSubtype); + appendOrcTypesRebuildSubtypes(result, child); + } + + type.setKind(OrcProto.Type.Kind.STRUCT); + + for (int i = 0 ; i < fieldNames.size(); i++) { + type.addSubtypes(fieldSubtypes.get(i)); + type.addFieldNames(fieldNames.get(i)); + } + result.set(subtype, type.build()); + needsAdd = false; + } + break; + case UNION: + { + // Make room for UNION type. + result.add(null); + + List<Integer> unionSubtypes = new ArrayList<Integer>(children.size()); + for(TypeDescription child: children) { + int unionSubtype = result.size(); + unionSubtypes.add(unionSubtype); + appendOrcTypesRebuildSubtypes(result, child); + } + + type.setKind(OrcProto.Type.Kind.UNION); + for (int i = 0 ; i < children.size(); i++) { + type.addSubtypes(unionSubtypes.get(i)); + } + result.set(subtype, type.build()); + needsAdd = false; + } + break; + default: + throw new IllegalArgumentException("Unknown category: " + typeDescr.getCategory()); + } + if (needsAdd) { + result.add(type.build()); + } + } + + /** + * NOTE: This method ignores the subtype numbers in the OrcProto.Type rebuilds the subtype + * numbers based on the length of the result list being appended. + * + * @param result + * @param typeInfo + */ + public static int appendOrcTypesRebuildSubtypes(List<OrcProto.Type> result, + List<OrcProto.Type> types, int columnId) { + + OrcProto.Type oldType = types.get(columnId++); + + int subtype = result.size(); + OrcProto.Type.Builder builder = OrcProto.Type.newBuilder(); + boolean needsAdd = true; + switch (oldType.getKind()) { + case BOOLEAN: + builder.setKind(OrcProto.Type.Kind.BOOLEAN); + break; + case BYTE: + builder.setKind(OrcProto.Type.Kind.BYTE); + break; + case SHORT: + builder.setKind(OrcProto.Type.Kind.SHORT); + break; + case INT: + builder.setKind(OrcProto.Type.Kind.INT); + break; + case LONG: + builder.setKind(OrcProto.Type.Kind.LONG); + break; + case FLOAT: + builder.setKind(OrcProto.Type.Kind.FLOAT); + break; + case DOUBLE: + builder.setKind(OrcProto.Type.Kind.DOUBLE); + break; + case STRING: + builder.setKind(OrcProto.Type.Kind.STRING); + break; + case CHAR: + builder.setKind(OrcProto.Type.Kind.CHAR); + builder.setMaximumLength(oldType.getMaximumLength()); + break; + case VARCHAR: + builder.setKind(Type.Kind.VARCHAR); + builder.setMaximumLength(oldType.getMaximumLength()); + break; + case BINARY: + builder.setKind(OrcProto.Type.Kind.BINARY); + break; + case TIMESTAMP: + builder.setKind(OrcProto.Type.Kind.TIMESTAMP); + break; + case DATE: + builder.setKind(OrcProto.Type.Kind.DATE); + break; + case DECIMAL: + builder.setKind(OrcProto.Type.Kind.DECIMAL); + builder.setPrecision(oldType.getPrecision()); + builder.setScale(oldType.getScale()); + break; + case LIST: + builder.setKind(OrcProto.Type.Kind.LIST); + builder.addSubtypes(++subtype); + result.add(builder.build()); + needsAdd = false; + columnId = appendOrcTypesRebuildSubtypes(result, types, columnId); + break; + case MAP: + { + // Make room for MAP type. + result.add(null); + + // Add MAP type pair in order to determine their subtype values. + columnId = appendOrcTypesRebuildSubtypes(result, types, columnId); + int subtype2 = result.size(); + columnId = appendOrcTypesRebuildSubtypes(result, types, columnId); + builder.setKind(OrcProto.Type.Kind.MAP); + builder.addSubtypes(subtype + 1); + builder.addSubtypes(subtype2); + result.set(subtype, builder.build()); + needsAdd = false; + } + break; + case STRUCT: + { + List<String> fieldNames = oldType.getFieldNamesList(); + + // Make room for STRUCT type. + result.add(null); + + List<Integer> fieldSubtypes = new ArrayList<Integer>(fieldNames.size()); + for(int i = 0 ; i < fieldNames.size(); i++) { + int fieldSubtype = result.size(); + fieldSubtypes.add(fieldSubtype); + columnId = appendOrcTypesRebuildSubtypes(result, types, columnId); + } + + builder.setKind(OrcProto.Type.Kind.STRUCT); + + for (int i = 0 ; i < fieldNames.size(); i++) { + builder.addSubtypes(fieldSubtypes.get(i)); + builder.addFieldNames(fieldNames.get(i)); + } + result.set(subtype, builder.build()); + needsAdd = false; + } + break; + case UNION: + { + int subtypeCount = oldType.getSubtypesCount(); + + // Make room for UNION type. + result.add(null); + + List<Integer> unionSubtypes = new ArrayList<Integer>(subtypeCount); + for(int i = 0 ; i < subtypeCount; i++) { + int unionSubtype = result.size(); + unionSubtypes.add(unionSubtype); + columnId = appendOrcTypesRebuildSubtypes(result, types, columnId); + } + + builder.setKind(OrcProto.Type.Kind.UNION); + for (int i = 0 ; i < subtypeCount; i++) { + builder.addSubtypes(unionSubtypes.get(i)); + } + result.set(subtype, builder.build()); + needsAdd = false; + } + break; + default: + throw new IllegalArgumentException("Unknown category: " + oldType.getKind()); + } + if (needsAdd) { + result.add(builder.build()); + } + return columnId; + } + + public static TypeDescription getDesiredRowTypeDescr(Configuration conf) { + + String columnNameProperty = null; + String columnTypeProperty = null; + + ArrayList<String> schemaEvolutionColumnNames = null; + ArrayList<TypeDescription> schemaEvolutionTypeDescrs = null; + + boolean haveSchemaEvolutionProperties = false; + if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION)) { + + columnNameProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS); + columnTypeProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES); + + haveSchemaEvolutionProperties = + (columnNameProperty != null && columnTypeProperty != null); + + if (haveSchemaEvolutionProperties) { + schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(",")); + if (schemaEvolutionColumnNames.size() == 0) { + haveSchemaEvolutionProperties = false; + } else { + schemaEvolutionTypeDescrs = + OrcUtils.typeDescriptionsFromHiveTypeProperty(columnTypeProperty); + if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) { + haveSchemaEvolutionProperties = false; + } + } + } + } + + if (!haveSchemaEvolutionProperties) { + + // Try regular properties; + columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS); + columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES); + if (columnTypeProperty == null || columnNameProperty == null) { + return null; + } + + schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(",")); + if (schemaEvolutionColumnNames.size() == 0) { + return null; + } + schemaEvolutionTypeDescrs = + OrcUtils.typeDescriptionsFromHiveTypeProperty(columnTypeProperty); + if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) { + return null; + } + } + + // Desired schema does not include virtual columns or partition columns. + TypeDescription result = TypeDescription.createStruct(); + for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) { + result.addField(schemaEvolutionColumnNames.get(i), schemaEvolutionTypeDescrs.get(i)); + } + + return result; + } }
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java index cf81782..750cf8d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -145,6 +146,7 @@ public interface Reader { private boolean[] include; private long offset = 0; private long length = Long.MAX_VALUE; + private TypeDescription schema; private SearchArgument sarg = null; private String[] columnNames = null; private Boolean useZeroCopy = null; @@ -173,6 +175,14 @@ public interface Reader { } /** + * Set the schema on read type description. + */ + public Options schema(TypeDescription schema) { + this.schema = schema; + return this; + } + + /** * Set search argument for predicate push down. * @param sarg the search argument * @param columnNames the column names for @@ -216,6 +226,10 @@ public interface Reader { return length; } + public TypeDescription getSchema() { + return schema; + } + public SearchArgument getSearchArgument() { return sarg; } @@ -245,6 +259,7 @@ public interface Reader { result.include = include; result.offset = offset; result.length = length; + result.schema = schema; result.sarg = sarg; result.columnNames = columnNames; result.useZeroCopy = useZeroCopy; http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java deleted file mode 100644 index 5e7d636..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.io.orc; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; - -import com.google.common.collect.Lists; - -/** - * Factory to create ORC tree readers. It also compares file schema with schema specified on read - * to see if type promotions are possible. - */ -public class RecordReaderFactory { - static final Logger LOG = LoggerFactory.getLogger(RecordReaderFactory.class); - private static final boolean isLogInfoEnabled = LOG.isInfoEnabled(); - - public static TreeReaderFactory.TreeReader createTreeReader(int colId, - Configuration conf, - List<OrcProto.Type> fileSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { - final boolean isAcid = checkAcidSchema(fileSchema); - final List<OrcProto.Type> originalFileSchema; - if (isAcid) { - originalFileSchema = fileSchema.subList(fileSchema.get(0).getSubtypesCount(), - fileSchema.size()); - } else { - originalFileSchema = fileSchema; - } - final int numCols = originalFileSchema.get(0).getSubtypesCount(); - List<OrcProto.Type> schemaOnRead = getSchemaOnRead(numCols, conf); - List<OrcProto.Type> schemaUsed = getMatchingSchema(fileSchema, schemaOnRead); - if (schemaUsed == null) { - return TreeReaderFactory.createTreeReader(colId, fileSchema, included, skipCorrupt); - } else { - return ConversionTreeReaderFactory.createTreeReader(colId, schemaUsed, included, skipCorrupt); - } - } - - static List<String> getAcidEventFields() { - return Lists.newArrayList("operation", "originalTransaction", "bucket", - "rowId", "currentTransaction", "row"); - } - - private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) { - if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) { - List<String> acidFields = getAcidEventFields(); - List<String> rootFields = fileSchema.get(0).getFieldNamesList(); - if (acidFields.equals(rootFields)) { - return true; - } - } - return false; - } - - private static List<OrcProto.Type> getMatchingSchema(List<OrcProto.Type> fileSchema, - List<OrcProto.Type> schemaOnRead) { - if (schemaOnRead == null) { - if (isLogInfoEnabled) { - LOG.info("Schema is not specified on read. Using file schema."); - } - return null; - } - - if (fileSchema.size() != schemaOnRead.size()) { - if (isLogInfoEnabled) { - LOG.info("Schema on read column count does not match file schema's column count." + - " Falling back to using file schema."); - } - return null; - } else { - List<OrcProto.Type> result = Lists.newArrayList(fileSchema); - // check type promotion. ORC can only support type promotions for integer types - // short -> int -> bigint as same integer readers are used for the above types. - boolean canPromoteType = false; - for (int i = 0; i < fileSchema.size(); i++) { - OrcProto.Type fColType = fileSchema.get(i); - OrcProto.Type rColType = schemaOnRead.get(i); - if (!fColType.getKind().equals(rColType.getKind())) { - - if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) { - - if (rColType.getKind().equals(OrcProto.Type.Kind.INT) || - rColType.getKind().equals(OrcProto.Type.Kind.LONG)) { - // type promotion possible, converting SHORT to INT/LONG requested type - result.set(i, result.get(i).toBuilder().setKind(rColType.getKind()).build()); - canPromoteType = true; - } else { - canPromoteType = false; - } - - } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) { - - if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) { - // type promotion possible, converting INT to LONG requested type - result.set(i, result.get(i).toBuilder().setKind(rColType.getKind()).build()); - canPromoteType = true; - } else { - canPromoteType = false; - } - - } else { - canPromoteType = false; - } - } - } - - if (canPromoteType) { - if (isLogInfoEnabled) { - LOG.info("Integer type promotion happened in ORC record reader. Using promoted schema."); - } - return result; - } - } - - return null; - } - - private static List<OrcProto.Type> getSchemaOnRead(int numCols, Configuration conf) { - String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES); - final String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS); - if (columnTypeProperty == null || columnNameProperty == null) { - return null; - } - - ArrayList<String> columnNames = Lists.newArrayList(columnNameProperty.split(",")); - ArrayList<TypeInfo> fieldTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); - StructTypeInfo structTypeInfo = new StructTypeInfo(); - // Column types from conf includes virtual and partition columns at the end. We consider only - // the actual columns in the file. - structTypeInfo.setAllStructFieldNames(Lists.newArrayList(columnNames.subList(0, numCols))); - structTypeInfo.setAllStructFieldTypeInfos(Lists.newArrayList(fieldTypes.subList(0, numCols))); - ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(structTypeInfo); - return getOrcTypes(oi); - } - - private static List<OrcProto.Type> getOrcTypes(ObjectInspector inspector) { - List<OrcProto.Type> result = Lists.newArrayList(); - getOrcTypesImpl(result, inspector); - return result; - } - - private static void getOrcTypesImpl(List<OrcProto.Type> result, ObjectInspector inspector) { - OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); - switch (inspector.getCategory()) { - case PRIMITIVE: - switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) { - case BOOLEAN: - type.setKind(OrcProto.Type.Kind.BOOLEAN); - break; - case BYTE: - type.setKind(OrcProto.Type.Kind.BYTE); - break; - case SHORT: - type.setKind(OrcProto.Type.Kind.SHORT); - break; - case INT: - type.setKind(OrcProto.Type.Kind.INT); - break; - case LONG: - type.setKind(OrcProto.Type.Kind.LONG); - break; - case FLOAT: - type.setKind(OrcProto.Type.Kind.FLOAT); - break; - case DOUBLE: - type.setKind(OrcProto.Type.Kind.DOUBLE); - break; - case STRING: - type.setKind(OrcProto.Type.Kind.STRING); - break; - case CHAR: - // The char length needs to be written to file and should be available - // from the object inspector - CharTypeInfo charTypeInfo = (CharTypeInfo) ((PrimitiveObjectInspector) inspector) - .getTypeInfo(); - type.setKind(OrcProto.Type.Kind.CHAR); - type.setMaximumLength(charTypeInfo.getLength()); - break; - case VARCHAR: - // The varchar length needs to be written to file and should be available - // from the object inspector - VarcharTypeInfo typeInfo = (VarcharTypeInfo) ((PrimitiveObjectInspector) inspector) - .getTypeInfo(); - type.setKind(OrcProto.Type.Kind.VARCHAR); - type.setMaximumLength(typeInfo.getLength()); - break; - case BINARY: - type.setKind(OrcProto.Type.Kind.BINARY); - break; - case TIMESTAMP: - type.setKind(OrcProto.Type.Kind.TIMESTAMP); - break; - case DATE: - type.setKind(OrcProto.Type.Kind.DATE); - break; - case DECIMAL: - DecimalTypeInfo decTypeInfo = (DecimalTypeInfo) ((PrimitiveObjectInspector) inspector) - .getTypeInfo(); - type.setKind(OrcProto.Type.Kind.DECIMAL); - type.setPrecision(decTypeInfo.precision()); - type.setScale(decTypeInfo.scale()); - break; - default: - throw new IllegalArgumentException("Unknown primitive category: " + - ((PrimitiveObjectInspector) inspector).getPrimitiveCategory()); - } - result.add(type.build()); - break; - case LIST: - type.setKind(OrcProto.Type.Kind.LIST); - result.add(type.build()); - getOrcTypesImpl(result, ((ListObjectInspector) inspector).getListElementObjectInspector()); - break; - case MAP: - type.setKind(OrcProto.Type.Kind.MAP); - result.add(type.build()); - getOrcTypesImpl(result, ((MapObjectInspector) inspector).getMapKeyObjectInspector()); - getOrcTypesImpl(result, ((MapObjectInspector) inspector).getMapValueObjectInspector()); - break; - case STRUCT: - type.setKind(OrcProto.Type.Kind.STRUCT); - result.add(type.build()); - for (StructField field : ((StructObjectInspector) inspector).getAllStructFieldRefs()) { - getOrcTypesImpl(result, field.getFieldObjectInspector()); - } - break; - case UNION: - type.setKind(OrcProto.Type.Kind.UNION); - result.add(type.build()); - for (ObjectInspector oi : ((UnionObjectInspector) inspector).getObjectInspectors()) { - getOrcTypesImpl(result, oi); - } - break; - default: - throw new IllegalArgumentException("Unknown category: " + inspector.getCategory()); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 84d627a..7f550a4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; +import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; @@ -153,6 +155,18 @@ public class RecordReaderImpl implements RecordReader { long strideRate, Configuration conf ) throws IOException { + + TreeReaderSchema treeReaderSchema; + if (options.getSchema() == null) { + treeReaderSchema = new TreeReaderSchema().fileTypes(types).schemaTypes(types); + } else { + + // Now that we are creating a record reader for a file, validate that the schema to read + // is compatible with the file schema. + // + List<Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema()); + treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes); + } this.path = path; this.codec = codec; this.types = types; @@ -197,7 +211,7 @@ public class RecordReaderImpl implements RecordReader { skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf); } - reader = RecordReaderFactory.createTreeReader(0, conf, types, included, skipCorrupt); + reader = TreeReaderFactory.createTreeReader(0, treeReaderSchema, included, skipCorrupt); indexes = new OrcProto.RowIndex[types.size()]; bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()]; advanceToNextRow(reader, 0L, true); @@ -1101,6 +1115,7 @@ public class RecordReaderImpl implements RecordReader { } else { result = (VectorizedRowBatch) previous; result.selectedInUse = false; + reader.setVectorColumnCount(result.getDataColumnCount()); reader.nextVector(result.cols, (int) batchSize); } http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java new file mode 100644 index 0000000..9d00eb2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; +import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema; + +/** + * Take the file types and the (optional) configuration column names/types and see if there + * has been schema evolution. + */ +public class SchemaEvolution { + + private static final Log LOG = LogFactory.getLog(SchemaEvolution.class); + + public static TreeReaderSchema validateAndCreate(List<OrcProto.Type> fileTypes, + List<OrcProto.Type> schemaTypes) throws IOException { + + // For ACID, the row is the ROW field in the outer STRUCT. + final boolean isAcid = checkAcidSchema(fileTypes); + final List<OrcProto.Type> rowSchema; + int rowSubtype; + if (isAcid) { + rowSubtype = OrcRecordUpdater.ROW + 1; + rowSchema = fileTypes.subList(rowSubtype, fileTypes.size()); + } else { + rowSubtype = 0; + rowSchema = fileTypes; + } + + // Do checking on the overlap. Additional columns will be defaulted to NULL. + + int numFileColumns = rowSchema.get(0).getSubtypesCount(); + int numDesiredColumns = schemaTypes.get(0).getSubtypesCount(); + + int numReadColumns = Math.min(numFileColumns, numDesiredColumns); + + /** + * Check type promotion. + * + * Currently, we only support integer type promotions that can be done "implicitly". + * That is, we know that using a bigger integer tree reader on the original smaller integer + * column will "just work". + * + * In the future, other type promotions might require type conversion. + */ + // short -> int -> bigint as same integer readers are used for the above types. + + for (int i = 0; i < numReadColumns; i++) { + OrcProto.Type fColType = fileTypes.get(rowSubtype + i); + OrcProto.Type rColType = schemaTypes.get(i); + if (!fColType.getKind().equals(rColType.getKind())) { + + boolean ok = false; + if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) { + + if (rColType.getKind().equals(OrcProto.Type.Kind.INT) || + rColType.getKind().equals(OrcProto.Type.Kind.LONG)) { + // type promotion possible, converting SHORT to INT/LONG requested type + ok = true; + } + } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) { + + if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) { + // type promotion possible, converting INT to LONG requested type + ok = true; + } + } + + if (!ok) { + throw new IOException("ORC does not support type conversion from " + + fColType.getKind().name() + " to " + rColType.getKind().name()); + } + } + } + + List<Type> fullSchemaTypes; + + if (isAcid) { + fullSchemaTypes = new ArrayList<OrcProto.Type>(); + + // This copies the ACID struct type which is subtype = 0. + // It has field names "operation" through "row". + // And we copy the types for all fields EXCEPT ROW (which must be last!). + + for (int i = 0; i < rowSubtype; i++) { + fullSchemaTypes.add(fileTypes.get(i).toBuilder().build()); + } + + // Add the row struct type. + OrcUtils.appendOrcTypesRebuildSubtypes(fullSchemaTypes, schemaTypes, 0); + } else { + fullSchemaTypes = schemaTypes; + } + + int innerStructSubtype = rowSubtype; + + // LOG.info("Schema evolution: (fileTypes) " + fileTypes.toString() + + // " (schemaEvolutionTypes) " + schemaEvolutionTypes.toString()); + + return new TreeReaderSchema(). + fileTypes(fileTypes). + schemaTypes(fullSchemaTypes). + innerStructSubtype(innerStructSubtype); + } + + private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) { + if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) { + List<String> rootFields = fileSchema.get(0).getFieldNamesList(); + if (acidEventFieldNames.equals(rootFields)) { + return true; + } + } + return false; + } + + /** + * @param typeDescr + * @return ORC types for the ACID event based on the row's type description + */ + public static List<Type> createEventSchema(TypeDescription typeDescr) { + + List<Type> result = new ArrayList<Type>(); + + OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); + type.setKind(OrcProto.Type.Kind.STRUCT); + type.addAllFieldNames(acidEventFieldNames); + for (int i = 0; i < acidEventFieldNames.size(); i++) { + type.addSubtypes(i + 1); + } + result.add(type.build()); + + // Automatically add all fields except the last (ROW). + for (int i = 0; i < acidEventOrcTypeKinds.size() - 1; i ++) { + type.clear(); + type.setKind(acidEventOrcTypeKinds.get(i)); + result.add(type.build()); + } + + OrcUtils.appendOrcTypesRebuildSubtypes(result, typeDescr); + return result; + } + + public static final List<String> acidEventFieldNames= new ArrayList<String>(); + static { + acidEventFieldNames.add("operation"); + acidEventFieldNames.add("originalTransaction"); + acidEventFieldNames.add("bucket"); + acidEventFieldNames.add("rowId"); + acidEventFieldNames.add("currentTransaction"); + acidEventFieldNames.add("row"); + } + public static final List<OrcProto.Type.Kind> acidEventOrcTypeKinds = + new ArrayList<OrcProto.Type.Kind>(); + static { + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.STRUCT); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index 4bcc621..8c13571 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -47,24 +47,80 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.TextReaderShim; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Factory for creating ORC tree readers. */ public class TreeReaderFactory { + public static final Logger LOG = LoggerFactory.getLogger(TreeReaderFactory.class); + + public static class TreeReaderSchema { + + /** + * The types in the ORC file. + */ + List<OrcProto.Type> fileTypes; + + /** + * The treeReaderSchema that the reader should read as. + */ + List<OrcProto.Type> schemaTypes; + + /** + * The subtype of the row STRUCT. Different than 0 for ACID. + */ + int innerStructSubtype; + + public TreeReaderSchema() { + fileTypes = null; + schemaTypes = null; + innerStructSubtype = -1; + } + + public TreeReaderSchema fileTypes(List<OrcProto.Type> fileTypes) { + this.fileTypes = fileTypes; + return this; + } + + public TreeReaderSchema schemaTypes(List<OrcProto.Type> schemaTypes) { + this.schemaTypes = schemaTypes; + return this; + } + + public TreeReaderSchema innerStructSubtype(int innerStructSubtype) { + this.innerStructSubtype = innerStructSubtype; + return this; + } + + public List<OrcProto.Type> getFileTypes() { + return fileTypes; + } + + public List<OrcProto.Type> getSchemaTypes() { + return schemaTypes; + } + + public int getInnerStructSubtype() { + return innerStructSubtype; + } + } + public abstract static class TreeReader { protected final int columnId; protected BitFieldReader present = null; protected boolean valuePresent = false; + protected int vectorColumnCount; TreeReader(int columnId) throws IOException { this(columnId, null); @@ -78,6 +134,11 @@ public class TreeReaderFactory { } else { present = new BitFieldReader(in, 1); } + vectorColumnCount = -1; + } + + void setVectorColumnCount(int vectorColumnCount) { + this.vectorColumnCount = vectorColumnCount; } void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { @@ -1962,25 +2023,57 @@ public class TreeReaderFactory { } } - public static class StructTreeReader extends TreeReader { + protected static class StructTreeReader extends TreeReader { + private final int fileColumnCount; + private final int resultColumnCount; protected final TreeReader[] fields; private final String[] fieldNames; - protected StructTreeReader(int columnId, - List<OrcProto.Type> types, + protected StructTreeReader( + int columnId, + TreeReaderSchema treeReaderSchema, boolean[] included, boolean skipCorrupt) throws IOException { super(columnId); - OrcProto.Type type = types.get(columnId); - int fieldCount = type.getFieldNamesCount(); - this.fields = new TreeReader[fieldCount]; - this.fieldNames = new String[fieldCount]; - for (int i = 0; i < fieldCount; ++i) { - int subtype = type.getSubtypes(i); - if (included == null || included[subtype]) { - this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt); + + OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId); + fileColumnCount = fileStructType.getFieldNamesCount(); + + OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId); + + if (columnId == treeReaderSchema.getInnerStructSubtype()) { + // If there are more result columns than reader columns, we will default those additional + // columns to NULL. + resultColumnCount = schemaStructType.getFieldNamesCount(); + } else { + resultColumnCount = fileColumnCount; + } + + this.fields = new TreeReader[fileColumnCount]; + this.fieldNames = new String[fileColumnCount]; + + if (included == null) { + for (int i = 0; i < fileColumnCount; ++i) { + int subtype = schemaStructType.getSubtypes(i); + this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); + // Use the treeReaderSchema evolution name since file/reader types may not have the real column name. + this.fieldNames[i] = schemaStructType.getFieldNames(i); + } + } else { + for (int i = 0; i < fileColumnCount; ++i) { + int subtype = schemaStructType.getSubtypes(i); + if (subtype >= included.length) { + throw new IOException("subtype " + subtype + " exceeds the included array size " + + included.length + " fileTypes " + treeReaderSchema.getFileTypes().toString() + + " schemaTypes " + treeReaderSchema.getSchemaTypes().toString() + + " innerStructSubtype " + treeReaderSchema.getInnerStructSubtype()); + } + if (included[subtype]) { + this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); + } + // Use the treeReaderSchema evolution name since file/reader types may not have the real column name. + this.fieldNames[i] = schemaStructType.getFieldNames(i); } - this.fieldNames[i] = type.getFieldNames(i); } } @@ -2000,22 +2093,28 @@ public class TreeReaderFactory { OrcStruct result = null; if (valuePresent) { if (previous == null) { - result = new OrcStruct(fields.length); + result = new OrcStruct(resultColumnCount); } else { result = (OrcStruct) previous; // If the input format was initialized with a file with a // different number of fields, the number of fields needs to // be updated to the correct number - if (result.getNumFields() != fields.length) { - result.setNumFields(fields.length); + if (result.getNumFields() != resultColumnCount) { + result.setNumFields(resultColumnCount); } } - for (int i = 0; i < fields.length; ++i) { + for (int i = 0; i < fileColumnCount; ++i) { if (fields[i] != null) { result.setFieldValue(i, fields[i].next(result.getFieldValue(i))); } } + if (resultColumnCount > fileColumnCount) { + for (int i = fileColumnCount; i < resultColumnCount; ++i) { + // Default new treeReaderSchema evolution fields to NULL. + result.setFieldValue(i, null); + } + } } return result; } @@ -2024,13 +2123,13 @@ public class TreeReaderFactory { public Object nextVector(Object previousVector, long batchSize) throws IOException { final ColumnVector[] result; if (previousVector == null) { - result = new ColumnVector[fields.length]; + result = new ColumnVector[fileColumnCount]; } else { result = (ColumnVector[]) previousVector; } // Read all the members of struct as column vectors - for (int i = 0; i < fields.length; i++) { + for (int i = 0; i < fileColumnCount; i++) { if (fields[i] != null) { if (result[i] == null) { result[i] = (ColumnVector) fields[i].nextVector(null, batchSize); @@ -2039,6 +2138,19 @@ public class TreeReaderFactory { } } } + + // Default additional treeReaderSchema evolution fields to NULL. + if (vectorColumnCount != -1 && vectorColumnCount > fileColumnCount) { + for (int i = fileColumnCount; i < vectorColumnCount; ++i) { + ColumnVector colVector = result[i]; + if (colVector != null) { + colVector.isRepeating = true; + colVector.noNulls = false; + colVector.isNull[0] = true; + } + } + } + return result; } @@ -2070,17 +2182,17 @@ public class TreeReaderFactory { protected RunLengthByteReader tags; protected UnionTreeReader(int columnId, - List<OrcProto.Type> types, + TreeReaderSchema treeReaderSchema, boolean[] included, boolean skipCorrupt) throws IOException { super(columnId); - OrcProto.Type type = types.get(columnId); + OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); int fieldCount = type.getSubtypesCount(); this.fields = new TreeReader[fieldCount]; for (int i = 0; i < fieldCount; ++i) { int subtype = type.getSubtypes(i); if (included == null || included[subtype]) { - this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt); + this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); } } } @@ -2150,12 +2262,12 @@ public class TreeReaderFactory { protected IntegerReader lengths = null; protected ListTreeReader(int columnId, - List<OrcProto.Type> types, + TreeReaderSchema treeReaderSchema, boolean[] included, boolean skipCorrupt) throws IOException { super(columnId); - OrcProto.Type type = types.get(columnId); - elementReader = createTreeReader(type.getSubtypes(0), types, included, skipCorrupt); + OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); + elementReader = createTreeReader(type.getSubtypes(0), treeReaderSchema, included, skipCorrupt); } @Override @@ -2240,20 +2352,20 @@ public class TreeReaderFactory { protected IntegerReader lengths = null; protected MapTreeReader(int columnId, - List<OrcProto.Type> types, + TreeReaderSchema treeReaderSchema, boolean[] included, boolean skipCorrupt) throws IOException { super(columnId); - OrcProto.Type type = types.get(columnId); + OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); int keyColumn = type.getSubtypes(0); int valueColumn = type.getSubtypes(1); if (included == null || included[keyColumn]) { - keyReader = createTreeReader(keyColumn, types, included, skipCorrupt); + keyReader = createTreeReader(keyColumn, treeReaderSchema, included, skipCorrupt); } else { keyReader = null; } if (included == null || included[valueColumn]) { - valueReader = createTreeReader(valueColumn, types, included, skipCorrupt); + valueReader = createTreeReader(valueColumn, treeReaderSchema, included, skipCorrupt); } else { valueReader = null; } @@ -2333,11 +2445,11 @@ public class TreeReaderFactory { } public static TreeReader createTreeReader(int columnId, - List<OrcProto.Type> types, + TreeReaderSchema treeReaderSchema, boolean[] included, boolean skipCorrupt ) throws IOException { - OrcProto.Type type = types.get(columnId); + OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); switch (type.getKind()) { case BOOLEAN: return new BooleanTreeReader(columnId); @@ -2377,13 +2489,13 @@ public class TreeReaderFactory { int scale = type.hasScale() ? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE; return new DecimalTreeReader(columnId, precision, scale); case STRUCT: - return new StructTreeReader(columnId, types, included, skipCorrupt); + return new StructTreeReader(columnId, treeReaderSchema, included, skipCorrupt); case LIST: - return new ListTreeReader(columnId, types, included, skipCorrupt); + return new ListTreeReader(columnId, treeReaderSchema, included, skipCorrupt); case MAP: - return new MapTreeReader(columnId, types, included, skipCorrupt); + return new MapTreeReader(columnId, treeReaderSchema, included, skipCorrupt); case UNION: - return new UnionTreeReader(columnId, types, included, skipCorrupt); + return new UnionTreeReader(columnId, treeReaderSchema, included, skipCorrupt); default: throw new IllegalArgumentException("Unsupported type " + type.getKind()); http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java index a8e5c2e..a2725b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.DataOutputBuffer; @@ -46,31 +45,25 @@ class VectorizedOrcAcidRowReader private final AcidInputFormat.RowReader<OrcStruct> innerReader; private final RecordIdentifier key; private final OrcStruct value; - private final VectorizedRowBatchCtx rowBatchCtx; + private VectorizedRowBatchCtx rbCtx; + private Object[] partitionValues; private final ObjectInspector objectInspector; private final DataOutputBuffer buffer = new DataOutputBuffer(); VectorizedOrcAcidRowReader(AcidInputFormat.RowReader<OrcStruct> inner, Configuration conf, + VectorizedRowBatchCtx vectorizedRowBatchCtx, FileSplit split) throws IOException { this.innerReader = inner; this.key = inner.createKey(); - this.rowBatchCtx = new VectorizedRowBatchCtx(); + rbCtx = vectorizedRowBatchCtx; + int partitionColumnCount = rbCtx.getPartitionColumnCount(); + if (partitionColumnCount > 0) { + partitionValues = new Object[partitionColumnCount]; + rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues); + } this.value = inner.createValue(); this.objectInspector = inner.getObjectInspector(); - try { - rowBatchCtx.init(conf, split); - } catch (ClassNotFoundException e) { - throw new IOException("Failed to initialize context", e); - } catch (SerDeException e) { - throw new IOException("Failed to initialize context", e); - } catch (InstantiationException e) { - throw new IOException("Failed to initialize context", e); - } catch (IllegalAccessException e) { - throw new IOException("Failed to initialize context", e); - } catch (HiveException e) { - throw new IOException("Failed to initialize context", e); - } } @Override @@ -82,23 +75,21 @@ class VectorizedOrcAcidRowReader if (!innerReader.next(key, value)) { return false; } - try { - rowBatchCtx.addPartitionColsToBatch(vectorizedRowBatch); - } catch (HiveException e) { - throw new IOException("Problem adding partition column", e); + if (partitionValues != null) { + rbCtx.addPartitionColsToBatch(vectorizedRowBatch, partitionValues); } try { VectorizedBatchUtil.acidAddRowToBatch(value, (StructObjectInspector) objectInspector, - vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer); + vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer); while (vectorizedRowBatch.size < vectorizedRowBatch.selected.length && innerReader.next(key, value)) { VectorizedBatchUtil.acidAddRowToBatch(value, (StructObjectInspector) objectInspector, - vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer); + vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer); } - } catch (HiveException he) { - throw new IOException("error iterating", he); + } catch (Exception e) { + throw new IOException("error iterating", e); } return true; } @@ -110,11 +101,7 @@ class VectorizedOrcAcidRowReader @Override public VectorizedRowBatch createValue() { - try { - return rowBatchCtx.createVectorizedRowBatch(); - } catch (HiveException e) { - throw new RuntimeException("Error creating a batch", e); - } + return rbCtx.createVectorizedRowBatch(); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index bf09001..6d280c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -27,14 +26,12 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.InputFormatChecker; -import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; @@ -47,7 +44,8 @@ import org.apache.hadoop.mapred.Reporter; * A MapReduce/Hive input format for ORC files. */ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch> - implements InputFormatChecker, VectorizedInputFormatInterface { + implements InputFormatChecker, VectorizedInputFormatInterface, + SelfDescribingInputFormatInterface { static class VectorizedOrcRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> { @@ -56,12 +54,21 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect private final long length; private float progress = 0.0f; private VectorizedRowBatchCtx rbCtx; + private final boolean[] columnsToIncludeTruncated; + private final Object[] partitionValues; private boolean addPartitionCols = true; VectorizedOrcRecordReader(Reader file, Configuration conf, FileSplit fileSplit) throws IOException { + + /** + * Do we have schema on read in the configuration variables? + */ + TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf); + List<OrcProto.Type> types = file.getTypes(); Reader.Options options = new Reader.Options(); + options.schema(schema); this.offset = fileSplit.getStart(); this.length = fileSplit.getLength(); options.range(offset, length); @@ -69,11 +76,17 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect OrcInputFormat.setSearchArgument(options, types, conf, true); this.reader = file.rowsOptions(options); - try { - rbCtx = new VectorizedRowBatchCtx(); - rbCtx.init(conf, fileSplit); - } catch (Exception e) { - throw new RuntimeException(e); + + rbCtx = Utilities.getVectorizedRowBatchCtx(conf); + + columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(conf); + + int partitionColumnCount = rbCtx.getPartitionColumnCount(); + if (partitionColumnCount > 0) { + partitionValues = new Object[partitionColumnCount]; + rbCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues); + } else { + partitionValues = null; } } @@ -90,7 +103,9 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect // as this does not call CreateValue for each new RecordReader it creates, this check is // required in next() if (addPartitionCols) { - rbCtx.addPartitionColsToBatch(value); + if (partitionValues != null) { + rbCtx.addPartitionColsToBatch(value, partitionValues); + } addPartitionCols = false; } reader.nextBatch(value); @@ -108,11 +123,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect @Override public VectorizedRowBatch createValue() { - try { - return rbCtx.createVectorizedRowBatch(); - } catch (HiveException e) { - throw new RuntimeException("Error creating a batch", e); - } + return rbCtx.createVectorizedRowBatch(columnsToIncludeTruncated); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java index b28d870..2072533 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java @@ -14,8 +14,10 @@ package org.apache.hadoop.hive.ql.io.parquet; import java.io.IOException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssign; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssignFactory; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; @@ -23,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; @@ -32,7 +35,6 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; - import org.apache.parquet.hadoop.ParquetInputFormat; /** @@ -52,6 +54,7 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable, private final ParquetRecordReaderWrapper internalReader; private VectorizedRowBatchCtx rbCtx; + private Object[] partitionValues; private ArrayWritable internalValues; private NullWritable internalKey; private VectorColumnAssign[] assigners; @@ -65,11 +68,11 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable, split, conf, reporter); - try { - rbCtx = new VectorizedRowBatchCtx(); - rbCtx.init(conf, split); - } catch (Exception e) { - throw new RuntimeException(e); + rbCtx = Utilities.getVectorizedRowBatchCtx(conf); + int partitionColumnCount = rbCtx.getPartitionColumnCount(); + if (partitionColumnCount > 0) { + partitionValues = new Object[partitionColumnCount]; + rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues); } } @@ -81,13 +84,9 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable, @Override public VectorizedRowBatch createValue() { - VectorizedRowBatch outputBatch = null; - try { - outputBatch = rbCtx.createVectorizedRowBatch(); - internalValues = internalReader.createValue(); - } catch (HiveException e) { - throw new RuntimeException("Error creating a batch", e); - } + VectorizedRowBatch outputBatch; + outputBatch = rbCtx.createVectorizedRowBatch(); + internalValues = internalReader.createValue(); return outputBatch; } http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 05dfc4b..82514d4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -111,6 +111,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.stats.StatsFactory; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.InputFormat; @@ -693,6 +695,11 @@ public final class GenMapRedUtils { parseCtx.getGlobalLimitCtx().disableOpt(); } + if (topOp instanceof TableScanOperator) { + Utilities.addSchemaEvolutionToTableScanOperator(partsList.getSourceTable(), + (TableScanOperator) topOp); + } + Iterator<Path> iterPath = partDir.iterator(); Iterator<PartitionDesc> iterPartnDesc = partDesc.iterator(); @@ -754,6 +761,7 @@ public final class GenMapRedUtils { * whether you need to add to map-reduce or local work * @param tt_desc * table descriptor + * @throws SerDeException */ public static void setTaskPlan(String path, String alias, Operator<? extends OperatorDesc> topOp, MapWork plan, boolean local, @@ -763,6 +771,16 @@ public final class GenMapRedUtils { return; } + if (topOp instanceof TableScanOperator) { + try { + Utilities.addSchemaEvolutionToTableScanOperator( + (StructObjectInspector) tt_desc.getDeserializer().getObjectInspector(), + (TableScanOperator) topOp); + } catch (Exception e) { + throw new SemanticException(e); + } + } + if (!local) { if (plan.getPathToAliases().get(path) == null) { plan.getPathToAliases().put(path, new ArrayList<String>()); http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java index 588f407..332e53b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java @@ -370,6 +370,7 @@ public class SimpleFetchOptimizer implements Transform { private FetchWork convertToWork() throws HiveException { inputs.clear(); + Utilities.addSchemaEvolutionToTableScanOperator(table, scanOp); TableDesc tableDesc = Utilities.getTableDesc(table); if (!table.isPartitioned()) { inputs.add(new ReadEntity(table, parent, !table.isView() && parent == null));
