http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java deleted file mode 100644 index 5ce7553..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java +++ /dev/null @@ -1,277 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector; - -import java.nio.ByteBuffer; -import java.sql.Timestamp; -import java.util.List; - -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.ByteStream; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; -import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; -import org.apache.hadoop.hive.serde2.io.DateWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.serde2.lazy.LazyDate; -import org.apache.hadoop.hive.serde2.lazy.LazyLong; -import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp; -import org.apache.hadoop.hive.serde2.lazy.LazyUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; - -/** - * VectorizedColumnarSerDe is used by Vectorized query execution engine - * for columnar based storage supported by RCFile. - */ -public class VectorizedColumnarSerDe extends ColumnarSerDe implements VectorizedSerde { - - public VectorizedColumnarSerDe() throws SerDeException { - } - - private final BytesRefArrayWritable[] byteRefArray = new BytesRefArrayWritable[VectorizedRowBatch.DEFAULT_SIZE]; - private final ObjectWritable ow = new ObjectWritable(); - private final ByteStream.Output serializeVectorStream = new ByteStream.Output(); - - /** - * Serialize a vectorized row batch - * - * @param vrg - * Vectorized row batch to serialize - * @param objInspector - * The ObjectInspector for the row object - * @return The serialized Writable object - * @throws SerDeException - * @see SerDe#serialize(Object, ObjectInspector) - */ - @Override - public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector) - throws SerDeException { - try { - // Validate that the OI is of struct type - if (objInspector.getCategory() != Category.STRUCT) { - throw new UnsupportedOperationException(getClass().toString() - + " can only serialize struct types, but we got: " - + objInspector.getTypeName()); - } - - VectorizedRowBatch batch = (VectorizedRowBatch) vrg; - StructObjectInspector soi = (StructObjectInspector) objInspector; - List<? extends StructField> fields = soi.getAllStructFieldRefs(); - - // Reset the byte buffer - serializeVectorStream.reset(); - int count = 0; - int rowIndex = 0; - for (int i = 0; i < batch.size; i++) { - - // If selectedInUse is true then we need to serialize only - // the selected indexes - if (batch.selectedInUse) { - rowIndex = batch.selected[i]; - } else { - rowIndex = i; - } - - BytesRefArrayWritable byteRow = byteRefArray[i]; - int numCols = fields.size(); - - if (byteRow == null) { - byteRow = new BytesRefArrayWritable(numCols); - byteRefArray[i] = byteRow; - } - - byteRow.resetValid(numCols); - - for (int p = 0; p < batch.projectionSize; p++) { - int k = batch.projectedColumns[p]; - ObjectInspector foi = fields.get(k).getFieldObjectInspector(); - ColumnVector currentColVector = batch.cols[k]; - - switch (foi.getCategory()) { - case PRIMITIVE: { - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; - if (!currentColVector.noNulls - && (currentColVector.isRepeating || currentColVector.isNull[rowIndex])) { - // The column is null hence write null value - serializeVectorStream.write(new byte[0], 0, 0); - } else { - // If here then the vector value is not null. - if (currentColVector.isRepeating) { - // If the vector has repeating values then set rowindex to zero - rowIndex = 0; - } - - switch (poi.getPrimitiveCategory()) { - case BOOLEAN: { - LongColumnVector lcv = (LongColumnVector) batch.cols[k]; - // In vectorization true is stored as 1 and false as 0 - boolean b = lcv.vector[rowIndex] == 1 ? true : false; - if (b) { - serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length); - } else { - serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length); - } - } - break; - case BYTE: - case SHORT: - case INT: - case LONG: - LongColumnVector lcv = (LongColumnVector) batch.cols[k]; - LazyLong.writeUTF8(serializeVectorStream, lcv.vector[rowIndex]); - break; - case FLOAT: - case DOUBLE: - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[k]; - ByteBuffer b = Text.encode(String.valueOf(dcv.vector[rowIndex])); - serializeVectorStream.write(b.array(), 0, b.limit()); - break; - case BINARY: { - BytesColumnVector bcv = (BytesColumnVector) batch.cols[k]; - byte[] bytes = bcv.vector[rowIndex]; - serializeVectorStream.write(bytes, 0, bytes.length); - } - break; - case STRING: - case CHAR: - case VARCHAR: { - // Is it correct to escape CHAR and VARCHAR? - BytesColumnVector bcv = (BytesColumnVector) batch.cols[k]; - LazyUtils.writeEscaped(serializeVectorStream, bcv.vector[rowIndex], - bcv.start[rowIndex], - bcv.length[rowIndex], - serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams - .getNeedsEscape()); - } - break; - case TIMESTAMP: - LongColumnVector tcv = (LongColumnVector) batch.cols[k]; - long timeInNanoSec = tcv.vector[rowIndex]; - Timestamp t = new Timestamp(0); - TimestampUtils.assignTimeInNanoSec(timeInNanoSec, t); - TimestampWritable tw = new TimestampWritable(); - tw.set(t); - LazyTimestamp.writeUTF8(serializeVectorStream, tw); - break; - case DATE: - LongColumnVector dacv = (LongColumnVector) batch.cols[k]; - DateWritable daw = new DateWritable((int) dacv.vector[rowIndex]); - LazyDate.writeUTF8(serializeVectorStream, daw); - break; - default: - throw new UnsupportedOperationException( - "Vectorizaton is not supported for datatype:" - + poi.getPrimitiveCategory()); - } - } - break; - } - case LIST: - case MAP: - case STRUCT: - case UNION: - throw new UnsupportedOperationException("Vectorizaton is not supported for datatype:" - + foi.getCategory()); - default: - throw new SerDeException("Unknown ObjectInspector category!"); - - } - - byteRow.get(k).set(serializeVectorStream.getData(), count, serializeVectorStream - .getLength() - count); - count = serializeVectorStream.getLength(); - } - - } - ow.set(byteRefArray); - } catch (Exception e) { - throw new SerDeException(e); - } - return ow; - } - - @Override - public SerDeStats getSerDeStats() { - return null; - } - - @Override - public Class<? extends Writable> getSerializedClass() { - return BytesRefArrayWritable.class; - } - - @Override - public Object deserialize(Writable blob) throws SerDeException { - - // Ideally this should throw UnsupportedOperationException as the serde is - // vectorized serde. But since RC file reader does not support vectorized reading this - // is left as it is. This function will be called from VectorizedRowBatchCtx::addRowToBatch - // to deserialize the row one by one and populate the batch. Once RC file reader supports vectorized - // reading this serde and be standalone serde with no dependency on ColumnarSerDe. - return super.deserialize(blob); - } - - @Override - public ObjectInspector getObjectInspector() throws SerDeException { - return cachedObjectInspector; - } - - @Override - public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { - throw new UnsupportedOperationException(); - } - - /** - * Deserializes the rowBlob into Vectorized row batch - * @param rowBlob - * rowBlob row batch to deserialize - * @param rowsInBlob - * Total number of rows in rowBlob to deserialize - * @param reuseBatch - * VectorizedRowBatch to which the rows should be serialized * - * @throws SerDeException - */ - @Override - public void deserializeVector(Object rowBlob, int rowsInBlob, - VectorizedRowBatch reuseBatch) throws SerDeException { - - BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) rowBlob; - DataOutputBuffer buffer = new DataOutputBuffer(); - for (int i = 0; i < rowsInBlob; i++) { - Object row = deserialize(refArray[i]); - try { - VectorizedBatchUtil.addRowToBatch(row, - (StructObjectInspector) cachedObjectInspector, i, - reuseBatch, buffer); - } catch (HiveException e) { - throw new SerDeException(e); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 6557002..a904a50 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -20,16 +20,10 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.io.IOException; import java.sql.Date; import java.sql.Timestamp; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,337 +31,266 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.IOPrepareCache; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.Deserializer; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hive.common.util.DateUtils; /** - * Context for Vectorized row batch. this calss does eager deserialization of row data using serde + * Context for Vectorized row batch. this class does eager deserialization of row data using serde * in the RecordReader layer. * It has supports partitions in this layer so that the vectorized batch is populated correctly * with the partition column. */ public class VectorizedRowBatchCtx { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(VectorizedRowBatchCtx.class.getName()); - // OI for raw row data (EG without partition cols) - private StructObjectInspector rawRowOI; + // The following information is for creating VectorizedRowBatch and for helping with + // knowing how the table is partitioned. + // + // It will be stored in MapWork and ReduceWork. + private String[] rowColumnNames; + private TypeInfo[] rowColumnTypeInfos; + private int dataColumnCount; + private int partitionColumnCount; - // OI for the row (Raw row OI + partition OI) - private StructObjectInspector rowOI; + private String[] scratchColumnTypeNames; - // Deserializer for the row data - private Deserializer deserializer; + /** + * Constructor for VectorizedRowBatchCtx + */ + public VectorizedRowBatchCtx() { + } - // Hash map of partition values. Key=TblColName value=PartitionValue - private Map<String, Object> partitionValues; - - //partition types - private Map<String, PrimitiveCategory> partitionTypes; + public VectorizedRowBatchCtx(String[] rowColumnNames, TypeInfo[] rowColumnTypeInfos, + int partitionColumnCount, String[] scratchColumnTypeNames) { + this.rowColumnNames = rowColumnNames; + this.rowColumnTypeInfos = rowColumnTypeInfos; + this.partitionColumnCount = partitionColumnCount; + this.scratchColumnTypeNames = scratchColumnTypeNames; - // partition column positions, for use by classes that need to know whether a given column is a - // partition column - private Set<Integer> partitionCols; - - // Column projection list - List of column indexes to include. This - // list does not contain partition columns - private List<Integer> colsToInclude; + dataColumnCount = rowColumnTypeInfos.length - partitionColumnCount; + } - private Map<Integer, String> scratchColumnTypeMap = null; + public String[] getRowColumnNames() { + return rowColumnNames; + } - /** - * Constructor for VectorizedRowBatchCtx - * - * @param rawRowOI - * OI for raw row data (EG without partition cols) - * @param rowOI - * OI for the row (Raw row OI + partition OI) - * @param deserializer - * Deserializer for the row data - * @param partitionValues - * Hash map of partition values. Key=TblColName value=PartitionValue - */ - public VectorizedRowBatchCtx(StructObjectInspector rawRowOI, StructObjectInspector rowOI, - Deserializer deserializer, Map<String, Object> partitionValues, - Map<String, PrimitiveCategory> partitionTypes) { - this.rowOI = rowOI; - this.rawRowOI = rawRowOI; - this.deserializer = deserializer; - this.partitionValues = partitionValues; - this.partitionTypes = partitionTypes; + public TypeInfo[] getRowColumnTypeInfos() { + return rowColumnTypeInfos; } - /** - * Constructor for VectorizedRowBatchCtx - */ - public VectorizedRowBatchCtx() { + public int getDataColumnCount() { + return dataColumnCount; + } + public int getPartitionColumnCount() { + return partitionColumnCount; + } + + public String[] getScratchColumnTypeNames() { + return scratchColumnTypeNames; } /** - * Initializes the VectorizedRowBatch context based on an scratch column type map and + * Initializes the VectorizedRowBatch context based on an scratch column type names and * object inspector. - * @param scratchColumnTypeMap - * @param rowOI + * @param structObjectInspector + * @param scratchColumnTypeNames * Object inspector that shapes the column types + * @throws HiveException */ - public void init(Map<Integer, String> scratchColumnTypeMap, - StructObjectInspector rowOI) { - this.scratchColumnTypeMap = scratchColumnTypeMap; - this.rowOI= rowOI; - this.rawRowOI = rowOI; + public void init(StructObjectInspector structObjectInspector, String[] scratchColumnTypeNames) + throws HiveException { + + // Row column information. + rowColumnNames = VectorizedBatchUtil.columnNamesFromStructObjectInspector(structObjectInspector); + rowColumnTypeInfos = VectorizedBatchUtil.typeInfosFromStructObjectInspector(structObjectInspector); + partitionColumnCount = 0; + dataColumnCount = rowColumnTypeInfos.length; + + // Scratch column information. + this.scratchColumnTypeNames = scratchColumnTypeNames; } - /** - * Initializes VectorizedRowBatch context based on the - * split and Hive configuration (Job conf with hive Plan). - * - * @param hiveConf - * Hive configuration using Hive plan is extracted - * @param split - * File split of the file being read - * @throws ClassNotFoundException - * @throws IOException - * @throws SerDeException - * @throws InstantiationException - * @throws IllegalAccessException - * @throws HiveException - */ - public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundException, - IOException, - SerDeException, - InstantiationException, - IllegalAccessException, - HiveException { + public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, Configuration hiveConf, + FileSplit split, Object[] partitionValues) throws IOException { Map<String, PartitionDesc> pathToPartitionInfo = Utilities .getMapWork(hiveConf).getPathToPartitionInfo(); - PartitionDesc part = HiveFileFormatUtils + PartitionDesc partDesc = HiveFileFormatUtils .getPartitionDescFromPathRecursively(pathToPartitionInfo, split.getPath(), IOPrepareCache.get().getPartitionDescMap()); - String partitionPath = split.getPath().getParent().toString(); - scratchColumnTypeMap = Utilities.getMapWorkVectorScratchColumnTypeMap(hiveConf); - // LOG.info("VectorizedRowBatchCtx init scratchColumnTypeMap " + scratchColumnTypeMap.toString()); - - Properties partProps = - (part.getPartSpec() == null || part.getPartSpec().isEmpty()) ? - part.getTableDesc().getProperties() : part.getProperties(); - - Class serdeclass = hiveConf.getClassByName(part.getSerdeClassName()); - Deserializer partDeserializer = (Deserializer) serdeclass.newInstance(); - SerDeUtils.initializeSerDe(partDeserializer, hiveConf, part.getTableDesc().getProperties(), - partProps); - StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer - .getObjectInspector(); - - deserializer = partDeserializer; - - // Check to see if this split is part of a partition of a table - String pcols = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - - String[] partKeys = null; - if (pcols != null && pcols.length() > 0) { - - // Partitions exist for this table. Get the partition object inspector and - // raw row object inspector (row with out partition col) - LinkedHashMap<String, String> partSpec = part.getPartSpec(); - partKeys = pcols.trim().split("/"); - String pcolTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES); - String[] partKeyTypes = pcolTypes.trim().split(":"); - - if (partKeys.length > partKeyTypes.length) { - throw new HiveException("Internal error : partKeys length, " +partKeys.length + - " greater than partKeyTypes length, " + partKeyTypes.length); - } - - List<String> partNames = new ArrayList<String>(partKeys.length); - List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length); - partitionValues = new LinkedHashMap<String, Object>(); - partitionTypes = new LinkedHashMap<String, PrimitiveCategory>(); - for (int i = 0; i < partKeys.length; i++) { - String key = partKeys[i]; - partNames.add(key); - ObjectInspector objectInspector = null; - Object objectVal; - if (partSpec == null) { - // for partitionless table, initialize partValue to empty string. - // We can have partitionless table even if we have partition keys - // when there is only only partition selected and the partition key is not - // part of the projection/include list. - objectVal = null; - objectInspector = PrimitiveObjectInspectorFactory.javaStringObjectInspector; - partitionTypes.put(key, PrimitiveCategory.STRING); - } else { - // Create a Standard java object Inspector - objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( - TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i])); - objectVal = - ObjectInspectorConverters. - getConverter(PrimitiveObjectInspectorFactory. - javaStringObjectInspector, objectInspector). - convert(partSpec.get(key)); - partitionTypes.put(key, TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]).getPrimitiveCategory()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("Partition column: name: " + key + ", value: " + objectVal + ", type: " + partitionTypes.get(key)); - } - partitionValues.put(key, objectVal); - partObjectInspectors.add(objectInspector); - } - - // Create partition OI - StructObjectInspector partObjectInspector = ObjectInspectorFactory - .getStandardStructObjectInspector(partNames, partObjectInspectors); - - // Get row OI from partition OI and raw row OI - StructObjectInspector rowObjectInspector = ObjectInspectorFactory - .getUnionStructObjectInspector(Arrays - .asList(new StructObjectInspector[] {partRawRowObjectInspector, partObjectInspector})); - rowOI = rowObjectInspector; - rawRowOI = partRawRowObjectInspector; - - // We have to do this after we've set rowOI, as getColIndexBasedOnColName uses it - partitionCols = new HashSet<Integer>(); - if (pcols != null && pcols.length() > 0) { - for (int i = 0; i < partKeys.length; i++) { - partitionCols.add(getColIndexBasedOnColName(partKeys[i])); - } - } + getPartitionValues(vrbCtx, partDesc, partitionValues); - } else { + } - // No partitions for this table, hence row OI equals raw row OI - rowOI = partRawRowObjectInspector; - rawRowOI = partRawRowObjectInspector; + public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, PartitionDesc partDesc, + Object[] partitionValues) { + + LinkedHashMap<String, String> partSpec = partDesc.getPartSpec(); + + for (int i = 0; i < vrbCtx.partitionColumnCount; i++) { + Object objectValue; + if (partSpec == null) { + // For partition-less table, initialize partValue to empty string. + // We can have partition-less table even if we have partition keys + // when there is only only partition selected and the partition key is not + // part of the projection/include list. + objectValue = null; + } else { + String key = vrbCtx.rowColumnNames[vrbCtx.dataColumnCount + i]; + + // Create a Standard java object Inspector + ObjectInspector objectInspector = + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo( + vrbCtx.rowColumnTypeInfos[vrbCtx.dataColumnCount + i]); + objectValue = + ObjectInspectorConverters. + getConverter(PrimitiveObjectInspectorFactory. + javaStringObjectInspector, objectInspector). + convert(partSpec.get(key)); + } + partitionValues[i] = objectValue; } - - colsToInclude = ColumnProjectionUtils.getReadColumnIDs(hiveConf); } - + /** * Creates a Vectorized row batch and the column vectors. * * @return VectorizedRowBatch * @throws HiveException */ - public VectorizedRowBatch createVectorizedRowBatch() throws HiveException { - List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs(); - VectorizedRowBatch result = new VectorizedRowBatch(fieldRefs.size()); - for (int j = 0; j < fieldRefs.size(); j++) { - // If the column is included in the include list or if the column is a - // partition column then create the column vector. Also note that partition columns are not - // in the included list. - if ((colsToInclude == null) || colsToInclude.contains(j) - || ((partitionValues != null) && - partitionValues.containsKey(fieldRefs.get(j).getFieldName()))) { - ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector(); - result.cols[j] = VectorizedBatchUtil.createColumnVector(foi); - } + public VectorizedRowBatch createVectorizedRowBatch() + { + int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length; + VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount); + + LOG.info("createVectorizedRowBatch columnsToIncludeTruncated NONE"); + for (int i = 0; i < rowColumnTypeInfos.length; i++) { + TypeInfo typeInfo = rowColumnTypeInfos[i]; + result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo); + } + + for (int i = 0; i < scratchColumnTypeNames.length; i++) { + String typeName = scratchColumnTypeNames[i]; + result.cols[rowColumnTypeInfos.length + i] = + VectorizedBatchUtil.createColumnVector(typeName); } - result.numCols = fieldRefs.size(); - this.addScratchColumnsToBatch(result); + + result.setPartitionInfo(dataColumnCount, partitionColumnCount); + result.reset(); return result; } - /** - * Adds the row to the batch after deserializing the row - * - * @param rowIndex - * Row index in the batch to which the row is added - * @param rowBlob - * Row blob (serialized version of row) - * @param batch - * Vectorized batch to which the row is added - * @param buffer a buffer to copy strings into - * @throws HiveException - * @throws SerDeException - */ - public void addRowToBatch(int rowIndex, Writable rowBlob, - VectorizedRowBatch batch, - DataOutputBuffer buffer - ) throws HiveException, SerDeException + public VectorizedRowBatch createVectorizedRowBatch(boolean[] columnsToIncludeTruncated) { - Object row = this.deserializer.deserialize(rowBlob); - VectorizedBatchUtil.addRowToBatch(row, this.rawRowOI, rowIndex, batch, buffer); - } + if (columnsToIncludeTruncated == null) { + return createVectorizedRowBatch(); + } - /** - * Deserialized set of rows and populates the batch - * - * @param rowBlob - * to deserialize - * @param batch - * Vectorized row batch which contains deserialized data - * @throws SerDeException - */ - public void convertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob, - VectorizedRowBatch batch) - throws SerDeException { - - if (deserializer instanceof VectorizedSerde) { - ((VectorizedSerde) deserializer).deserializeVector(rowBlob, rowsInBlob, batch); - } else { - throw new SerDeException( - "Not able to deserialize row batch. Serde does not implement VectorizedSerde"); + LOG.info("createVectorizedRowBatch columnsToIncludeTruncated " + Arrays.toString(columnsToIncludeTruncated)); + int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length; + VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount); + + for (int i = 0; i < columnsToIncludeTruncated.length; i++) { + if (columnsToIncludeTruncated[i]) { + TypeInfo typeInfo = rowColumnTypeInfos[i]; + result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo); + } + } + + for (int i = dataColumnCount; i < dataColumnCount + partitionColumnCount; i++) { + TypeInfo typeInfo = rowColumnTypeInfos[i]; + result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo); } + + for (int i = 0; i < scratchColumnTypeNames.length; i++) { + String typeName = scratchColumnTypeNames[i]; + result.cols[rowColumnTypeInfos.length + i] = + VectorizedBatchUtil.createColumnVector(typeName); + } + + result.setPartitionInfo(dataColumnCount, partitionColumnCount); + + result.reset(); + return result; } - private int getColIndexBasedOnColName(String colName) throws HiveException - { - List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs(); - for (int i = 0; i < fieldRefs.size(); i++) { - if (fieldRefs.get(i).getFieldName().equals(colName)) { - return i; + public boolean[] getColumnsToIncludeTruncated(Configuration conf) { + boolean[] columnsToIncludeTruncated = null; + + List<Integer> columnsToIncludeTruncatedList = ColumnProjectionUtils.getReadColumnIDs(conf); + if (columnsToIncludeTruncatedList != null && columnsToIncludeTruncatedList.size() > 0 ) { + + // Partitioned columns will not be in the include list. + + boolean[] columnsToInclude = new boolean[dataColumnCount]; + Arrays.fill(columnsToInclude, false); + for (int columnNum : columnsToIncludeTruncatedList) { + if (columnNum < dataColumnCount) { + columnsToInclude[columnNum] = true; + } + } + + // Work backwards to find the highest wanted column. + + int highestWantedColumnNum = -1; + for (int i = dataColumnCount - 1; i >= 0; i--) { + if (columnsToInclude[i]) { + highestWantedColumnNum = i; + break; + } + } + if (highestWantedColumnNum == -1) { + throw new RuntimeException("No columns to include?"); + } + int newColumnCount = highestWantedColumnNum + 1; + if (newColumnCount == dataColumnCount) { + // Didn't trim any columns off the end. Use the original. + columnsToIncludeTruncated = columnsToInclude; + } else { + columnsToIncludeTruncated = Arrays.copyOf(columnsToInclude, newColumnCount); } } - throw new HiveException("Not able to find column name in row object inspector"); + return columnsToIncludeTruncated; } - + /** * Add the partition values to the batch * * @param batch + * @param partitionValues * @throws HiveException */ - public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException { - int colIndex; - Object value; - PrimitiveCategory pCategory; + public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partitionValues) + { if (partitionValues != null) { - for (String key : partitionValues.keySet()) { - colIndex = getColIndexBasedOnColName(key); - value = partitionValues.get(key); - pCategory = partitionTypes.get(key); - - switch (pCategory) { + for (int i = 0; i < partitionColumnCount; i++) { + Object value = partitionValues[i]; + + int colIndex = dataColumnCount + i; + String partitionColumnName = rowColumnNames[colIndex]; + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) rowColumnTypeInfos[colIndex]; + switch (primitiveTypeInfo.getPrimitiveCategory()) { case BOOLEAN: { LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex]; if (value == null) { @@ -519,7 +442,7 @@ public class VectorizedRowBatchCtx { HiveDecimal hd = (HiveDecimal) value; dv.set(0, hd); dv.isRepeating = true; - dv.isNull[0] = false; + dv.isNull[0] = false; } } break; @@ -548,15 +471,15 @@ public class VectorizedRowBatchCtx { bcv.isNull[0] = true; bcv.isRepeating = true; } else { - bcv.fill(sVal.getBytes()); + bcv.fill(sVal.getBytes()); bcv.isNull[0] = false; } } break; - + default: - throw new HiveException("Unable to recognize the partition type " + pCategory + - " for column " + key); + throw new RuntimeException("Unable to recognize the partition type " + primitiveTypeInfo.getPrimitiveCategory() + + " for column " + partitionColumnName); } } } @@ -564,64 +487,12 @@ public class VectorizedRowBatchCtx { /** * Determine whether a given column is a partition column - * @param colnum column number in + * @param colNum column number in * {@link org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch}s created by this context. * @return true if it is a partition column, false otherwise */ - public final boolean isPartitionCol(int colnum) { - return (partitionCols == null) ? false : partitionCols.contains(colnum); - } - - private void addScratchColumnsToBatch(VectorizedRowBatch vrb) throws HiveException { - if (scratchColumnTypeMap != null && !scratchColumnTypeMap.isEmpty()) { - int origNumCols = vrb.numCols; - int newNumCols = vrb.cols.length+scratchColumnTypeMap.keySet().size(); - vrb.cols = Arrays.copyOf(vrb.cols, newNumCols); - for (int i = origNumCols; i < newNumCols; i++) { - String typeName = scratchColumnTypeMap.get(i); - if (typeName == null) { - throw new HiveException("No type entry found for column " + i + " in map " + scratchColumnTypeMap.toString()); - } - vrb.cols[i] = allocateColumnVector(typeName, - VectorizedRowBatch.DEFAULT_SIZE); - } - vrb.numCols = vrb.cols.length; - } + public final boolean isPartitionCol(int colNum) { + return colNum >= dataColumnCount && colNum < rowColumnTypeInfos.length; } - /** - * Get the scale and precision for the given decimal type string. The decimal type is assumed to be - * of the format decimal(precision,scale) e.g. decimal(20,10). - * @param decimalType The given decimal type string. - * @return An integer array of size 2 with first element set to precision and second set to scale. - */ - private static int[] getScalePrecisionFromDecimalType(String decimalType) { - Pattern p = Pattern.compile("\\d+"); - Matcher m = p.matcher(decimalType); - m.find(); - int precision = Integer.parseInt(m.group()); - m.find(); - int scale = Integer.parseInt(m.group()); - int [] precScale = { precision, scale }; - return precScale; - } - - public static ColumnVector allocateColumnVector(String type, int defaultSize) { - if (type.equalsIgnoreCase("double")) { - return new DoubleColumnVector(defaultSize); - } else if (VectorizationContext.isStringFamily(type)) { - return new BytesColumnVector(defaultSize); - } else if (VectorizationContext.decimalTypePattern.matcher(type).matches()){ - int [] precisionScale = getScalePrecisionFromDecimalType(type); - return new DecimalColumnVector(defaultSize, precisionScale[0], precisionScale[1]); - } else if (type.equalsIgnoreCase("long") || - type.equalsIgnoreCase("date") || - type.equalsIgnoreCase("timestamp") || - type.equalsIgnoreCase(serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME) || - type.equalsIgnoreCase(serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME)) { - return new LongColumnVector(defaultSize); - } else { - throw new RuntimeException("Cannot allocate vector column for " + type); - } - } } http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java index 1d5a9de..6ecfaf7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java @@ -20,12 +20,8 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.Future; - -import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,12 +30,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.HashTableLoaderFactory; import org.apache.hadoop.hive.ql.exec.HashTableLoader; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnMapping; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping; @@ -51,15 +42,12 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.optimized.VectorMapJoinOptimizedCreateHashTable; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader; -import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -69,10 +57,8 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -576,7 +562,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem smallTableVectorDeserializeRow = new VectorDeserializeRow<LazyBinaryDeserializeRead>( new LazyBinaryDeserializeRead( - VectorizedBatchUtil.primitiveTypeInfosFromTypeNames( + VectorizedBatchUtil.typeInfosFromTypeNames( smallTableMapping.getTypeNames()))); smallTableVectorDeserializeRow.init(smallTableMapping.getOutputColumns()); } @@ -649,22 +635,12 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem * build join output results in. */ protected VectorizedRowBatch setupOverflowBatch() throws HiveException { + + int initialColumnCount = vContext.firstOutputColumnIndex(); VectorizedRowBatch overflowBatch; - Map<Integer, String> scratchColumnTypeMap = vOutContext.getScratchColumnTypeMap(); - int maxColumn = 0; - for (int i = 0; i < outputProjection.length; i++) { - int outputColumn = outputProjection[i]; - if (maxColumn < outputColumn) { - maxColumn = outputColumn; - } - } - for (int outputColumn : scratchColumnTypeMap.keySet()) { - if (maxColumn < outputColumn) { - maxColumn = outputColumn; - } - } - overflowBatch = new VectorizedRowBatch(maxColumn + 1); + int totalNumColumns = initialColumnCount + vOutContext.getScratchColumnTypeNames().length; + overflowBatch = new VectorizedRowBatch(totalNumColumns); // First, just allocate just the projection columns we will be using. for (int i = 0; i < outputProjection.length; i++) { @@ -674,9 +650,9 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem } // Now, add any scratch columns needed for children operators. - for (int outputColumn : scratchColumnTypeMap.keySet()) { - String typeName = scratchColumnTypeMap.get(outputColumn); - allocateOverflowBatchColumnVector(overflowBatch, outputColumn, typeName); + int outputColumn = initialColumnCount; + for (String typeName : vOutContext.getScratchColumnTypeNames()) { + allocateOverflowBatchColumnVector(overflowBatch, outputColumn++, typeName); } overflowBatch.projectedColumns = outputProjection; @@ -696,33 +672,9 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem if (overflowBatch.cols[outputColumn] == null) { typeName = VectorizationContext.mapTypeNameSynonyms(typeName); - String columnVectorTypeName; - TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName); - Type columnVectorType = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo); - - switch (columnVectorType) { - case LONG: - columnVectorTypeName = "long"; - break; - - case DOUBLE: - columnVectorTypeName = "double"; - break; - - case BYTES: - columnVectorTypeName = "string"; - break; - - case DECIMAL: - columnVectorTypeName = typeName; // Keep precision and scale. - break; - - default: - throw new HiveException("Unexpected column vector type " + columnVectorType); - } - overflowBatch.cols[outputColumn] = VectorizedRowBatchCtx.allocateColumnVector(columnVectorTypeName, VectorizedRowBatch.DEFAULT_SIZE); + overflowBatch.cols[outputColumn] = VectorizedBatchUtil.createColumnVector(typeInfo); if (isLogDebugEnabled) { LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator initializeOp overflowBatch outputColumn " + outputColumn + " class " + overflowBatch.cols[outputColumn].getClass().getSimpleName()); http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index b20cca4..2d9da84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.ByteStream.Output; /** @@ -413,8 +414,8 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC private void setupSpillSerDe(VectorizedRowBatch batch) throws HiveException { - PrimitiveTypeInfo[] inputObjInspectorsTypeInfos = - VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector( + TypeInfo[] inputObjInspectorsTypeInfos = + VectorizedBatchUtil.typeInfosFromStructObjectInspector( (StructObjectInspector) inputObjInspectors[posBigTable]); List<Integer> projectedColumns = vContext.getProjectedColumns(); http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index a883124..b63deb2 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -510,11 +510,16 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> // ensure filters are not set from previous pushFilters jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR); jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR); + + Utilities.unsetSchemaEvolution(jobConf); + TableScanDesc scanDesc = tableScan.getConf(); if (scanDesc == null) { return; } + Utilities.addTableSchemaToConf(jobConf, tableScan); + // construct column name list and types for reference by filter push down Utilities.setColumnNameList(jobConf, tableScan); Utilities.setColumnTypeList(jobConf, tableScan); http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java index 9879dfe..8d94da8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java @@ -36,6 +36,17 @@ public final class IOConstants { public static final String AVRO = "AVRO"; public static final String AVROFILE = "AVROFILE"; + /** + * The desired TABLE column names and types for input format schema evolution. + * This is different than COLUMNS and COLUMNS_TYPES, which are based on individual partition + * metadata. + * + * Virtual columns and partition columns are not included + * + */ + public static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns"; + public static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types"; + @VisibleForTesting public static final String CUSTOM_TEXT_SERDE = "CustomTextSerde"; http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java new file mode 100644 index 0000000..6c455bd --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +/** + * Marker interface to indicate a given input format is self-describing and + * can perform schema evolution itself. + */ +public interface SelfDescribingInputFormatInterface { + +} http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java deleted file mode 100644 index e9e1d5a..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; - -/** - * A MapReduce/Hive Vectorized input format for RC files. - */ -public class VectorizedRCFileInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch> - implements InputFormatChecker { - - public VectorizedRCFileInputFormat() { - setMinSplitSize(SequenceFile.SYNC_INTERVAL); - } - - @Override - @SuppressWarnings("unchecked") - public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(InputSplit split, JobConf job, - Reporter reporter) throws IOException { - - reporter.setStatus(split.toString()); - - return new VectorizedRCFileRecordReader(job, (FileSplit) split); - } - - @Override - public boolean validateInput(FileSystem fs, HiveConf conf, - List<FileStatus> files) throws IOException { - if (files.size() <= 0) { - return false; - } - for (int fileId = 0; fileId < files.size(); fileId++) { - RCFile.Reader reader = null; - try { - reader = new RCFile.Reader(fs, files.get(fileId) - .getPath(), conf); - reader.close(); - reader = null; - } catch (IOException e) { - return false; - } finally { - if (null != reader) { - reader.close(); - } - } - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java deleted file mode 100644 index 4cc1c2f..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.WeakHashMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer; -import org.apache.hadoop.hive.ql.io.RCFile.Reader; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.RecordReader; - -/** - * RCFileRecordReader. - */ -public class VectorizedRCFileRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> { - - private final Reader in; - private final long start; - private final long end; - private boolean more = true; - protected Configuration conf; - private final FileSplit split; - private final boolean useCache; - private VectorizedRowBatchCtx rbCtx; - private final LongWritable keyCache = new LongWritable(); - private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable(); - private boolean addPartitionCols = true; - private final DataOutputBuffer buffer = new DataOutputBuffer(); - - private static RCFileSyncCache syncCache = new RCFileSyncCache(); - - private static final class RCFileSyncEntry { - long end; - long endSync; - } - - private static final class RCFileSyncCache { - - private final Map<String, RCFileSyncEntry> cache; - - public RCFileSyncCache() { - cache = Collections.synchronizedMap(new WeakHashMap<String, RCFileSyncEntry>()); - } - - public void put(FileSplit split, long endSync) { - Path path = split.getPath(); - long end = split.getStart() + split.getLength(); - String key = path.toString() + "+" + String.format("%d", end); - - RCFileSyncEntry entry = new RCFileSyncEntry(); - entry.end = end; - entry.endSync = endSync; - if (entry.endSync >= entry.end) { - cache.put(key, entry); - } - } - - public long get(FileSplit split) { - Path path = split.getPath(); - long start = split.getStart(); - String key = path.toString() + "+" + String.format("%d", start); - RCFileSyncEntry entry = cache.get(key); - if (entry != null) { - return entry.endSync; - } - return -1; - } - } - - public VectorizedRCFileRecordReader(Configuration conf, FileSplit split) - throws IOException { - - Path path = split.getPath(); - FileSystem fs = path.getFileSystem(conf); - this.in = new RCFile.Reader(fs, path, conf); - this.end = split.getStart() + split.getLength(); - this.conf = conf; - this.split = split; - - useCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEUSERCFILESYNCCACHE); - - if (split.getStart() > in.getPosition()) { - long oldSync = useCache ? syncCache.get(split) : -1; - if (oldSync == -1) { - in.sync(split.getStart()); // sync to start - } else { - in.seek(oldSync); - } - } - - this.start = in.getPosition(); - - more = start < end; - try { - rbCtx = new VectorizedRowBatchCtx(); - rbCtx.init(conf, split); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public Class<?> getKeyClass() { - return LongWritable.class; - } - - public Class<?> getValueClass() { - return BytesRefArrayWritable.class; - } - - @Override - public NullWritable createKey() { - return NullWritable.get(); - } - - @Override - public VectorizedRowBatch createValue() { - VectorizedRowBatch result; - try { - result = rbCtx.createVectorizedRowBatch(); - } catch (HiveException e) { - throw new RuntimeException("Error creating a batch", e); - } - return result; - } - - public boolean nextBlock() throws IOException { - return in.nextBlock(); - } - - @Override - public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { - - // Reset column fields noNull values to true - VectorizedBatchUtil.setNoNullFields(value); - buffer.reset(); - value.selectedInUse = false; - for (int i = 0; i < value.numCols; i++) { - value.cols[i].isRepeating = false; - } - - int i = 0; - try { - - for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { - more = next(keyCache); - if (more) { - // Check and update partition cols if necessary. Ideally this should be done - // in CreateValue() as the partition is constant per split. But since Hive uses - // CombineHiveRecordReader and as this does not call CreateValue() for - // each new RecordReader it creates, this check is required in next() - if (addPartitionCols) { - rbCtx.addPartitionColsToBatch(value); - addPartitionCols = false; - } - in.getCurrentRow(colsCache); - // Currently RCFile reader does not support reading vectorized - // data. Populating the batch by adding one row at a time. - rbCtx.addRowToBatch(i, (Writable) colsCache, value, buffer); - } else { - break; - } - } - } catch (Exception e) { - throw new RuntimeException("Error while getting next row", e); - } - value.size = i; - return more; - } - - protected boolean next(LongWritable key) throws IOException { - if (!more) { - return false; - } - - more = in.next(key); - - long lastSeenSyncPos = in.lastSeenSyncPos(); - - if (lastSeenSyncPos >= end) { - if (useCache) { - syncCache.put(split, lastSeenSyncPos); - } - more = false; - return more; - } - return more; - } - - /** - * Return the progress within the input split. - * - * @return 0.0 to 1.0 of the input byte range - */ - public float getProgress() throws IOException { - if (end == start) { - return 0.0f; - } else { - return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start)); - } - } - - public long getPos() throws IOException { - return in.getPosition(); - } - - public KeyBuffer getKeyBuffer() { - return in.getCurrentKeyBufferObj(); - } - - protected void seek(long pos) throws IOException { - in.seek(pos); - } - - public void sync(long pos) throws IOException { - in.sync(pos); - } - - public void resetBuffer() { - in.resetBuffer(); - } - - public long getStart() { - return start; - } - - public void close() throws IOException { - in.close(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java deleted file mode 100644 index aaf4eb4..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.io.orc; - -import java.io.IOException; -import java.util.List; - -/** - * Factory for creating ORC tree readers. These tree readers can handle type promotions and type - * conversions. - */ -public class ConversionTreeReaderFactory extends TreeReaderFactory { - - // TODO: This is currently only a place holder for type conversions. - - public static TreeReader createTreeReader(int columnId, - List<OrcProto.Type> types, - boolean[] included, - boolean skipCorrupt - ) throws IOException { - return TreeReaderFactory.createTreeReader(columnId, types, included, skipCorrupt); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index b4dd4ab..56ac40b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -402,7 +402,7 @@ public final class OrcFile { public WriterOptions inspector(ObjectInspector value) { this.inspector = value; if (!explicitSchema) { - schema = OrcOutputFormat.convertTypeInfo( + schema = OrcUtils.convertTypeInfo( TypeInfoUtils.getTypeInfoFromObjectInspector(value)); } return this; http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 46862da..714af23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -61,8 +61,10 @@ import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; @@ -73,7 +75,6 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -116,7 +117,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; */ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, InputFormatChecker, VectorizedInputFormatInterface, LlapWrappableInputFormatInterface, - AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination { + SelfDescribingInputFormatInterface, AcidInputFormat<NullWritable, OrcStruct>, + CombineHiveInputFormat.AvoidSplitCombination { static enum SplitStrategyKind { HYBRID, @@ -232,7 +234,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, Configuration conf, long offset, long length ) throws IOException { + + /** + * Do we have schema on read in the configuration variables? + */ + TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf); + Reader.Options options = new Reader.Options().range(offset, length); + options.schema(schema); boolean isOriginal = isOriginal(file); List<OrcProto.Type> types = file.getTypes(); options.include(genIncludedColumns(types, conf, isOriginal)); @@ -1415,7 +1424,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, if (vectorMode) { return (org.apache.hadoop.mapred.RecordReader) - new VectorizedOrcAcidRowReader(inner, conf, (FileSplit) inputSplit); + new VectorizedOrcAcidRowReader(inner, conf, + Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit); } return new NullKeyRecordReader(inner, conf); } @@ -1467,10 +1477,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } } + // The schema type description does not include the ACID fields (i.e. it is the + // non-ACID original schema). + private static boolean SCHEMA_TYPES_IS_ORIGINAL = true; @Override public RowReader<OrcStruct> getReader(InputSplit inputSplit, - Options options) throws IOException { + Options options) + throws IOException { final OrcSplit split = (OrcSplit) inputSplit; final Path path = split.getPath(); Path root; @@ -1485,36 +1499,30 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas()); final Configuration conf = options.getConfiguration(); + + + /** + * Do we have schema on read in the configuration variables? + */ + TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf); + final Reader reader; final int bucket; - Reader.Options readOptions = new Reader.Options(); + Reader.Options readOptions = new Reader.Options().schema(schema); readOptions.range(split.getStart(), split.getLength()); + + // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription. + final List<Type> schemaTypes = OrcUtils.getOrcTypes(schema); + readOptions.include(genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL)); + setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL); + if (split.hasBase()) { bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf) .getBucket(); reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); - final List<OrcProto.Type> types = reader.getTypes(); - readOptions.include(genIncludedColumns(types, conf, split.isOriginal())); - setSearchArgument(readOptions, types, conf, split.isOriginal()); } else { bucket = (int) split.getStart(); reader = null; - if(deltas != null && deltas.length > 0) { - Path bucketPath = AcidUtils.createBucketFile(deltas[0], bucket); - OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf); - FileSystem fs = readerOptions.getFilesystem(); - if(fs == null) { - fs = path.getFileSystem(options.getConfiguration()); - } - if(fs.exists(bucketPath)) { - /* w/o schema evolution (which ACID doesn't support yet) all delta - files have the same schema, so choosing the 1st one*/ - final List<OrcProto.Type> types = - OrcFile.createReader(bucketPath, readerOptions).getTypes(); - readOptions.include(genIncludedColumns(types, conf, split.isOriginal())); - setSearchArgument(readOptions, types, conf, split.isOriginal()); - } - } } String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY, Long.MAX_VALUE + ":"); @@ -1527,9 +1535,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, @Override public ObjectInspector getObjectInspector() { - return ((StructObjectInspector) records.getObjectInspector()) - .getAllStructFieldRefs().get(OrcRecordUpdater.ROW) - .getFieldObjectInspector(); + return OrcStruct.createObjectInspector(0, schemaTypes); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java index 8a5de7f..2d0eaaf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java @@ -68,88 +68,6 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow> private static final Logger LOG = LoggerFactory.getLogger(OrcOutputFormat.class); - static TypeDescription convertTypeInfo(TypeInfo info) { - switch (info.getCategory()) { - case PRIMITIVE: { - PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info; - switch (pinfo.getPrimitiveCategory()) { - case BOOLEAN: - return TypeDescription.createBoolean(); - case BYTE: - return TypeDescription.createByte(); - case SHORT: - return TypeDescription.createShort(); - case INT: - return TypeDescription.createInt(); - case LONG: - return TypeDescription.createLong(); - case FLOAT: - return TypeDescription.createFloat(); - case DOUBLE: - return TypeDescription.createDouble(); - case STRING: - return TypeDescription.createString(); - case DATE: - return TypeDescription.createDate(); - case TIMESTAMP: - return TypeDescription.createTimestamp(); - case BINARY: - return TypeDescription.createBinary(); - case DECIMAL: { - DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo; - return TypeDescription.createDecimal() - .withScale(dinfo.getScale()) - .withPrecision(dinfo.getPrecision()); - } - case VARCHAR: { - BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo; - return TypeDescription.createVarchar() - .withMaxLength(cinfo.getLength()); - } - case CHAR: { - BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo; - return TypeDescription.createChar() - .withMaxLength(cinfo.getLength()); - } - default: - throw new IllegalArgumentException("ORC doesn't handle primitive" + - " category " + pinfo.getPrimitiveCategory()); - } - } - case LIST: { - ListTypeInfo linfo = (ListTypeInfo) info; - return TypeDescription.createList - (convertTypeInfo(linfo.getListElementTypeInfo())); - } - case MAP: { - MapTypeInfo minfo = (MapTypeInfo) info; - return TypeDescription.createMap - (convertTypeInfo(minfo.getMapKeyTypeInfo()), - convertTypeInfo(minfo.getMapValueTypeInfo())); - } - case UNION: { - UnionTypeInfo minfo = (UnionTypeInfo) info; - TypeDescription result = TypeDescription.createUnion(); - for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) { - result.addUnionChild(convertTypeInfo(child)); - } - return result; - } - case STRUCT: { - StructTypeInfo sinfo = (StructTypeInfo) info; - TypeDescription result = TypeDescription.createStruct(); - for(String fieldName: sinfo.getAllStructFieldNames()) { - result.addField(fieldName, - convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName))); - } - return result; - } - default: - throw new IllegalArgumentException("ORC doesn't handle " + - info.getCategory()); - } - } - private static class OrcRecordWriter implements RecordWriter<NullWritable, OrcSerdeRow>, StatsProvidingRecordWriter { @@ -242,7 +160,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow> TypeDescription schema = TypeDescription.createStruct(); for (int i = 0; i < columnNames.size(); ++i) { schema.addField(columnNames.get(i), - convertTypeInfo(columnTypes.get(i))); + OrcUtils.convertTypeInfo(columnTypes.get(i))); } if (LOG.isDebugEnabled()) { LOG.debug("ORC schema = " + schema); http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index ebe1afd..bc4d2ee 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io.orc; import com.google.common.annotations.VisibleForTesting; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -25,23 +26,16 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import java.io.IOException; -import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Deque; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -57,6 +51,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ private final Configuration conf; private final boolean collapse; private final RecordReader baseReader; + private final ObjectInspector objectInspector; private final long offset; private final long length; private final ValidTxnList validTxnList; @@ -443,6 +438,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ this.offset = options.getOffset(); this.length = options.getLength(); this.validTxnList = validTxnList; + + TypeDescription typeDescr = OrcUtils.getDesiredRowTypeDescr(conf); + if (typeDescr == null) { + throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); + } + + objectInspector = OrcRecordUpdater.createEventSchema + (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); + // modify the options to reflect the event instead of the base row Reader.Options eventOptions = createEventOptions(options); if (reader == null) { @@ -675,46 +679,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ @Override public ObjectInspector getObjectInspector() { - // Read the configuration parameters - String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS); - // NOTE: if "columns.types" is missing, all columns will be of String type - String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES); - - // Parse the configuration parameters - ArrayList<String> columnNames = new ArrayList<String>(); - Deque<Integer> virtualColumns = new ArrayDeque<Integer>(); - if (columnNameProperty != null && columnNameProperty.length() > 0) { - String[] colNames = columnNameProperty.split(","); - for (int i = 0; i < colNames.length; i++) { - if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(colNames[i])) { - virtualColumns.addLast(i); - } else { - columnNames.add(colNames[i]); - } - } - } - if (columnTypeProperty == null) { - // Default type: all string - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < columnNames.size(); i++) { - if (i > 0) { - sb.append(":"); - } - sb.append("string"); - } - columnTypeProperty = sb.toString(); - } - - ArrayList<TypeInfo> fieldTypes = - TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); - while (virtualColumns.size() > 0) { - fieldTypes.remove(virtualColumns.removeLast()); - } - StructTypeInfo rowType = new StructTypeInfo(); - rowType.setAllStructFieldNames(columnNames); - rowType.setAllStructFieldTypeInfos(fieldTypes); - return OrcRecordUpdater.createEventSchema - (OrcStruct.createObjectInspector(rowType)); + return objectInspector; } @Override
