http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index 8ee8cd7..8bb32ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -24,7 +24,6 @@ import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -36,12 +35,9 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; import org.apache.hadoop.hive.serde2.io.ByteWritable; @@ -60,7 +56,8 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.orc.TypeDescription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.orc.impl.BitFieldReader; import org.apache.orc.impl.DynamicByteArray; import org.apache.orc.impl.InStream; @@ -78,6 +75,60 @@ import org.apache.orc.impl.StreamName; */ public class TreeReaderFactory { + private static final Logger LOG = + LoggerFactory.getLogger(TreeReaderFactory.class); + + public static class TreeReaderSchema { + + /** + * The types in the ORC file. + */ + List<OrcProto.Type> fileTypes; + + /** + * The treeReaderSchema that the reader should read as. + */ + List<OrcProto.Type> schemaTypes; + + /** + * The subtype of the row STRUCT. Different than 0 for ACID. + */ + int innerStructSubtype; + + public TreeReaderSchema() { + fileTypes = null; + schemaTypes = null; + innerStructSubtype = -1; + } + + public TreeReaderSchema fileTypes(List<OrcProto.Type> fileTypes) { + this.fileTypes = fileTypes; + return this; + } + + public TreeReaderSchema schemaTypes(List<OrcProto.Type> schemaTypes) { + this.schemaTypes = schemaTypes; + return this; + } + + public TreeReaderSchema innerStructSubtype(int innerStructSubtype) { + this.innerStructSubtype = innerStructSubtype; + return this; + } + + public List<OrcProto.Type> getFileTypes() { + return fileTypes; + } + + public List<OrcProto.Type> getSchemaTypes() { + return schemaTypes; + } + + public int getInnerStructSubtype() { + return innerStructSubtype; + } + } + public abstract static class TreeReader { protected final int columnId; protected BitFieldReader present = null; @@ -179,60 +230,36 @@ public class TreeReaderFactory { } /** - * Called at the top level to read into the given batch. - * @param batch the batch to read into - * @param batchSize the number of rows to read - * @throws IOException - */ - public void nextBatch(VectorizedRowBatch batch, - int batchSize) throws IOException { - batch.cols[0].reset(); - batch.cols[0].ensureSize(batchSize, false); - nextVector(batch.cols[0], null, batchSize); - } - - /** * Populates the isNull vector array in the previousVector object based on * the present stream values. This function is called from all the child * readers, and they all set the values based on isNull field value. * - * @param previous The columnVector object whose isNull value is populated - * @param isNull Whether the each value was null at a higher level. If - * isNull is null, all values are non-null. + * @param previousVector The columnVector object whose isNull value is populated * @param batchSize Size of the column vector + * @return next column vector * @throws IOException */ - public void nextVector(ColumnVector previous, - boolean[] isNull, - int batchSize) throws IOException { - if (present != null || isNull != null) { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + ColumnVector result = (ColumnVector) previousVector; + if (present != null) { // Set noNulls and isNull vector of the ColumnVector based on // present stream - previous.noNulls = true; - boolean allNull = true; + result.noNulls = true; for (int i = 0; i < batchSize; i++) { - if (isNull == null || !isNull[i]) { - if (present != null && present.next() != 1) { - previous.noNulls = false; - previous.isNull[i] = true; - } else { - previous.isNull[i] = false; - allNull = false; - } - } else { - previous.noNulls = false; - previous.isNull[i] = true; + result.isNull[i] = (present.next() != 1); + if (result.noNulls && result.isNull[i]) { + result.noNulls = false; } } - previous.isRepeating = !previous.noNulls && allNull; } else { - // There is no present stream, this means that all the values are + // There is not present stream, this means that all the values are // present. - previous.noNulls = true; + result.noNulls = true; for (int i = 0; i < batchSize; i++) { - previous.isNull[i] = false; + result.isNull[i] = false; } } + return previousVector; } public BitFieldReader getPresent() { @@ -240,46 +267,6 @@ public class TreeReaderFactory { } } - public static class NullTreeReader extends TreeReader { - - public NullTreeReader(int columnId) throws IOException { - super(columnId); - } - - @Override - public void startStripe(Map<StreamName, InStream> streams, - OrcProto.StripeFooter footer) { - // PASS - } - - @Override - void skipRows(long rows) { - // PASS - } - - @Override - public void seek(PositionProvider position) { - // PASS - } - - @Override - public void seek(PositionProvider[] position) { - // PASS - } - - @Override - Object next(Object previous) { - return null; - } - - @Override - public void nextVector(ColumnVector vector, boolean[] isNull, int size) { - vector.noNulls = false; - vector.isNull[0] = true; - vector.isRepeating = true; - } - } - public static class BooleanTreeReader extends TreeReader { protected BitFieldReader reader = null; @@ -335,16 +322,20 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - LongColumnVector result = (LongColumnVector) previousVector; + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final LongColumnVector result; + if (previousVector == null) { + result = new LongColumnVector(); + } else { + result = (LongColumnVector) previousVector; + } // Read present/isNull stream - super.nextVector(result, isNull, batchSize); + super.nextVector(result, batchSize); // Read value entries based on isNull entries reader.nextVector(result, batchSize); + return result; } } @@ -396,16 +387,20 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - final LongColumnVector result = (LongColumnVector) previousVector; + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final LongColumnVector result; + if (previousVector == null) { + result = new LongColumnVector(); + } else { + result = (LongColumnVector) previousVector; + } // Read present/isNull stream - super.nextVector(result, isNull, batchSize); + super.nextVector(result, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, result.vector, batchSize); + reader.nextVector(result, batchSize); + return result; } @Override @@ -478,16 +473,20 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - final LongColumnVector result = (LongColumnVector) previousVector; + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final LongColumnVector result; + if (previousVector == null) { + result = new LongColumnVector(); + } else { + result = (LongColumnVector) previousVector; + } // Read present/isNull stream - super.nextVector(result, isNull, batchSize); + super.nextVector(result, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, result.vector, batchSize); + reader.nextVector(result, batchSize); + return result; } @Override @@ -560,16 +559,20 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - final LongColumnVector result = (LongColumnVector) previousVector; + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final LongColumnVector result; + if (previousVector == null) { + result = new LongColumnVector(); + } else { + result = (LongColumnVector) previousVector; + } // Read present/isNull stream - super.nextVector(result, isNull, batchSize); + super.nextVector(result, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, result.vector, batchSize); + reader.nextVector(result, batchSize); + return result; } @Override @@ -643,16 +646,20 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - final LongColumnVector result = (LongColumnVector) previousVector; + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final LongColumnVector result; + if (previousVector == null) { + result = new LongColumnVector(); + } else { + result = (LongColumnVector) previousVector; + } // Read present/isNull stream - super.nextVector(result, isNull, batchSize); + super.nextVector(result, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, result.vector, batchSize); + reader.nextVector(result, batchSize); + return result; } @Override @@ -712,13 +719,16 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - final DoubleColumnVector result = (DoubleColumnVector) previousVector; + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final DoubleColumnVector result; + if (previousVector == null) { + result = new DoubleColumnVector(); + } else { + result = (DoubleColumnVector) previousVector; + } // Read present/isNull stream - super.nextVector(result, isNull, batchSize); + super.nextVector(result, batchSize); final boolean hasNulls = !result.noNulls; boolean allNulls = hasNulls; @@ -758,6 +768,7 @@ public class TreeReaderFactory { } result.isRepeating = repeating; } + return result; } @Override @@ -821,13 +832,16 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - final DoubleColumnVector result = (DoubleColumnVector) previousVector; + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final DoubleColumnVector result; + if (previousVector == null) { + result = new DoubleColumnVector(); + } else { + result = (DoubleColumnVector) previousVector; + } // Read present/isNull stream - super.nextVector(result, isNull, batchSize); + super.nextVector(result, batchSize); final boolean hasNulls = !result.noNulls; boolean allNulls = hasNulls; @@ -867,6 +881,8 @@ public class TreeReaderFactory { } result.isRepeating = repeating; } + + return result; } @Override @@ -958,15 +974,19 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - final BytesColumnVector result = (BytesColumnVector) previousVector; + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final BytesColumnVector result; + if (previousVector == null) { + result = new BytesColumnVector(); + } else { + result = (BytesColumnVector) previousVector; + } // Read present/isNull stream - super.nextVector(result, isNull, batchSize); + super.nextVector(result, batchSize); BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize); + return result; } @Override @@ -991,6 +1011,7 @@ public class TreeReaderFactory { private final TimeZone readerTimeZone; private TimeZone writerTimeZone; private boolean hasSameTZRules; + private TimestampWritable scratchTimestampWritable; TimestampTreeReader(int columnId, boolean skipCorrupt) throws IOException { this(columnId, null, null, null, null, skipCorrupt); @@ -1094,9 +1115,9 @@ public class TreeReaderFactory { int newNanos = parseNanos(nanos.next()); // fix the rounding when we divided by 1000. if (millis >= 0) { - millis += newNanos / WriterImpl.NANOS_PER_MILLI; + millis += newNanos / 1000000; } else { - millis -= newNanos / WriterImpl.NANOS_PER_MILLI; + millis -= newNanos / 1000000; } long offset = 0; // If reader and writer time zones have different rules, adjust the timezone difference @@ -1123,45 +1144,31 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - TimestampColumnVector result = (TimestampColumnVector) previousVector; - super.nextVector(previousVector, isNull, batchSize); + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final TimestampColumnVector result; + if (previousVector == null) { + result = new TimestampColumnVector(); + } else { + result = (TimestampColumnVector) previousVector; + } + result.reset(); + if (scratchTimestampWritable == null) { + scratchTimestampWritable = new TimestampWritable(); + } + Object obj; for (int i = 0; i < batchSize; i++) { - if (result.noNulls || !result.isNull[i]) { - long millis = data.next() + base_timestamp; - int newNanos = parseNanos(nanos.next()); - if (millis < 0 && newNanos != 0) { - millis -= 1; - } - millis *= WriterImpl.MILLIS_PER_SECOND; - long offset = 0; - // If reader and writer time zones have different rules, adjust the timezone difference - // between reader and writer taking day light savings into account. - if (!hasSameTZRules) { - offset = writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(millis); - } - long adjustedMillis = millis + offset; - // Sometimes the reader timezone might have changed after adding the adjustedMillis. - // To account for that change, check for any difference in reader timezone after - // adding adjustedMillis. If so use the new offset (offset at adjustedMillis point of time). - if (!hasSameTZRules && - (readerTimeZone.getOffset(millis) != readerTimeZone.getOffset(adjustedMillis))) { - long newOffset = - writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(adjustedMillis); - adjustedMillis = millis + newOffset; - } - result.time[i] = adjustedMillis; - result.nanos[i] = newNanos; - if (result.isRepeating && i != 0 && - (result.time[0] != result.time[i] || - result.nanos[0] != result.nanos[i])) { - result.isRepeating = false; - } + obj = next(scratchTimestampWritable); + if (obj == null) { + result.noNulls = false; + result.isNull[i] = true; + } else { + TimestampWritable writable = (TimestampWritable) obj; + result.set(i, writable.getTimestamp()); } } + + return result; } private static int parseNanos(long serialized) { @@ -1246,16 +1253,20 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - final LongColumnVector result = (LongColumnVector) previousVector; + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final LongColumnVector result; + if (previousVector == null) { + result = new LongColumnVector(); + } else { + result = (LongColumnVector) previousVector; + } // Read present/isNull stream - super.nextVector(result, isNull, batchSize); + super.nextVector(result, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, result.vector, batchSize); + reader.nextVector(result, batchSize); + return result; } @Override @@ -1267,7 +1278,7 @@ public class TreeReaderFactory { public static class DecimalTreeReader extends TreeReader { protected InStream valueStream; protected IntegerReader scaleReader = null; - private int[] scratchScaleVector; + private LongColumnVector scratchScaleVector; private final int precision; private final int scale; @@ -1282,7 +1293,7 @@ public class TreeReaderFactory { super(columnId, present); this.precision = precision; this.scale = scale; - this.scratchScaleVector = new int[VectorizedRowBatch.DEFAULT_SIZE]; + this.scratchScaleVector = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); this.valueStream = valueStream; if (scaleStream != null && encoding != null) { checkEncoding(encoding); @@ -1341,34 +1352,46 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - final DecimalColumnVector result = (DecimalColumnVector) previousVector; + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final DecimalColumnVector result; + if (previousVector == null) { + result = new DecimalColumnVector(precision, scale); + } else { + result = (DecimalColumnVector) previousVector; + } + + // Save the reference for isNull in the scratch vector + boolean[] scratchIsNull = scratchScaleVector.isNull; // Read present/isNull stream - super.nextVector(result, isNull, batchSize); + super.nextVector(result, batchSize); - if (batchSize > scratchScaleVector.length) { - scratchScaleVector = new int[(int) batchSize]; - } - scaleReader.nextVector(result, scratchScaleVector, batchSize); // Read value entries based on isNull entries - if (result.noNulls) { - for (int r=0; r < batchSize; ++r) { + if (result.isRepeating) { + if (!result.isNull[0]) { BigInteger bInt = SerializationUtils.readBigInteger(valueStream); - HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]); - result.set(r, dec); + short scaleInData = (short) scaleReader.next(); + HiveDecimal dec = HiveDecimal.create(bInt, scaleInData); + dec = HiveDecimal.enforcePrecisionScale(dec, precision, scale); + result.set(0, dec); } - } else if (!result.isRepeating || !result.isNull[0]) { - for (int r=0; r < batchSize; ++r) { - if (!result.isNull[r]) { + } else { + // result vector has isNull values set, use the same to read scale vector. + scratchScaleVector.isNull = result.isNull; + scaleReader.nextVector(scratchScaleVector, batchSize); + for (int i = 0; i < batchSize; i++) { + if (!result.isNull[i]) { BigInteger bInt = SerializationUtils.readBigInteger(valueStream); - HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]); - result.set(r, dec); + short scaleInData = (short) scratchScaleVector.vector[i]; + HiveDecimal dec = HiveDecimal.create(bInt, scaleInData); + dec = HiveDecimal.enforcePrecisionScale(dec, precision, scale); + result.set(i, dec); } } } + // Switch back the null vector. + scratchScaleVector.isNull = scratchIsNull; + return result; } @Override @@ -1458,10 +1481,8 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - reader.nextVector(previousVector, isNull, batchSize); + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + return reader.nextVector(previousVector, batchSize); } @Override @@ -1480,7 +1501,7 @@ public class TreeReaderFactory { BytesColumnVector result, final int batchSize) throws IOException { // Read lengths scratchlcv.isNull = result.isNull; // Notice we are replacing the isNull vector here... - lengths.nextVector(scratchlcv, scratchlcv.vector, batchSize); + lengths.nextVector(scratchlcv, batchSize); int totalLength = 0; if (!scratchlcv.isRepeating) { for (int i = 0; i < batchSize; i++) { @@ -1511,35 +1532,31 @@ public class TreeReaderFactory { } // This method has the common code for reading in bytes into a BytesColumnVector. - public static void readOrcByteArrays(InStream stream, - IntegerReader lengths, - LongColumnVector scratchlcv, - BytesColumnVector result, - int batchSize) throws IOException { - if (result.noNulls || !(result.isRepeating && result.isNull[0])) { - byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv, - result, (int) batchSize); - - // Too expensive to figure out 'repeating' by comparisons. - result.isRepeating = false; - int offset = 0; - if (!scratchlcv.isRepeating) { - for (int i = 0; i < batchSize; i++) { - if (!scratchlcv.isNull[i]) { - result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]); - offset += scratchlcv.vector[i]; - } else { - result.setRef(i, allBytes, 0, 0); - } + public static void readOrcByteArrays(InStream stream, IntegerReader lengths, + LongColumnVector scratchlcv, + BytesColumnVector result, final int batchSize) throws IOException { + + byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv, result, batchSize); + + // Too expensive to figure out 'repeating' by comparisons. + result.isRepeating = false; + int offset = 0; + if (!scratchlcv.isRepeating) { + for (int i = 0; i < batchSize; i++) { + if (!scratchlcv.isNull[i]) { + result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]); + offset += scratchlcv.vector[i]; + } else { + result.setRef(i, allBytes, 0, 0); } - } else { - for (int i = 0; i < batchSize; i++) { - if (!scratchlcv.isNull[i]) { - result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]); - offset += scratchlcv.vector[0]; - } else { - result.setRef(i, allBytes, 0, 0); - } + } + } else { + for (int i = 0; i < batchSize; i++) { + if (!scratchlcv.isNull[i]) { + result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]); + offset += scratchlcv.vector[0]; + } else { + result.setRef(i, allBytes, 0, 0); } } } @@ -1624,16 +1641,19 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - final BytesColumnVector result = (BytesColumnVector) previousVector; + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final BytesColumnVector result; + if (previousVector == null) { + result = new BytesColumnVector(); + } else { + result = (BytesColumnVector) previousVector; + } // Read present/isNull stream - super.nextVector(result, isNull, batchSize); + super.nextVector(result, batchSize); - BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, - result, batchSize); + BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize); + return result; } @Override @@ -1796,15 +1816,18 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - final BytesColumnVector result = (BytesColumnVector) previousVector; + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final BytesColumnVector result; int offset; int length; + if (previousVector == null) { + result = new BytesColumnVector(); + } else { + result = (BytesColumnVector) previousVector; + } // Read present/isNull stream - super.nextVector(result, isNull, batchSize); + super.nextVector(result, batchSize); if (dictionaryBuffer != null) { @@ -1815,8 +1838,7 @@ public class TreeReaderFactory { // Read string offsets scratchlcv.isNull = result.isNull; - scratchlcv.ensureSize((int) batchSize, false); - reader.nextVector(scratchlcv, scratchlcv.vector, batchSize); + reader.nextVector(scratchlcv, batchSize); if (!scratchlcv.isRepeating) { // The vector has non-repeating strings. Iterate thru the batch @@ -1856,6 +1878,7 @@ public class TreeReaderFactory { } } } + return result; } int getDictionaryEntryLength(int entry, int offset) { @@ -1913,13 +1936,11 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { // Get the vector of strings from StringTreeReader, then make a 2nd pass to // adjust down the length (right trim and truncate) if necessary. - super.nextVector(previousVector, isNull, batchSize); - BytesColumnVector result = (BytesColumnVector) previousVector; + BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize); + int adjustedDownLen; if (result.isRepeating) { if (result.noNulls || !result.isNull[0]) { @@ -1952,6 +1973,7 @@ public class TreeReaderFactory { } } } + return result; } } @@ -1988,13 +2010,10 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { + public Object nextVector(Object previousVector, final int batchSize) throws IOException { // Get the vector of strings from StringTreeReader, then make a 2nd pass to // adjust down the length (truncate) if necessary. - super.nextVector(previousVector, isNull, batchSize); - BytesColumnVector result = (BytesColumnVector) previousVector; + BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize); int adjustedDownLen; if (result.isRepeating) { @@ -2026,26 +2045,62 @@ public class TreeReaderFactory { } } } + return result; } } protected static class StructTreeReader extends TreeReader { + private final int readColumnCount; + private final int resultColumnCount; protected final TreeReader[] fields; + private final String[] fieldNames; - protected StructTreeReader(int columnId, - TypeDescription readerSchema, - SchemaEvolution treeReaderSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { + protected StructTreeReader( + int columnId, + TreeReaderSchema treeReaderSchema, + boolean[] included, + boolean skipCorrupt) throws IOException { super(columnId); - TypeDescription fileSchema = treeReaderSchema.getFileType(readerSchema); + OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId); + + OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId); - List<TypeDescription> childrenTypes = readerSchema.getChildren(); - this.fields = new TreeReader[childrenTypes.size()]; - for (int i = 0; i < fields.length; ++i) { - TypeDescription subtype = childrenTypes.get(i); - this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); + readColumnCount = Math.min(fileStructType.getFieldNamesCount(), schemaStructType.getFieldNamesCount()); + + if (columnId == treeReaderSchema.getInnerStructSubtype()) { + // If there are more result columns than reader columns, we will default those additional + // columns to NULL. + resultColumnCount = schemaStructType.getFieldNamesCount(); + } else { + resultColumnCount = readColumnCount; + } + + this.fields = new TreeReader[readColumnCount]; + this.fieldNames = new String[readColumnCount]; + + if (included == null) { + for (int i = 0; i < readColumnCount; ++i) { + int subtype = schemaStructType.getSubtypes(i); + this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); + // Use the treeReaderSchema evolution name since file/reader types may not have the real column name. + this.fieldNames[i] = schemaStructType.getFieldNames(i); + } + } else { + for (int i = 0; i < readColumnCount; ++i) { + int subtype = schemaStructType.getSubtypes(i); + if (subtype >= included.length) { + throw new IOException("subtype " + subtype + " exceeds the included array size " + + included.length + " fileTypes " + treeReaderSchema.getFileTypes().toString() + + " schemaTypes " + treeReaderSchema.getSchemaTypes().toString() + + " innerStructSubtype " + treeReaderSchema.getInnerStructSubtype()); + } + if (included[subtype]) { + this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); + } + // Use the treeReaderSchema evolution name since file/reader types may not have the real column name. + this.fieldNames[i] = schemaStructType.getFieldNames(i); + } } } @@ -2065,52 +2120,65 @@ public class TreeReaderFactory { OrcStruct result = null; if (valuePresent) { if (previous == null) { - result = new OrcStruct(fields.length); + result = new OrcStruct(resultColumnCount); } else { result = (OrcStruct) previous; // If the input format was initialized with a file with a // different number of fields, the number of fields needs to // be updated to the correct number - result.setNumFields(fields.length); + if (result.getNumFields() != resultColumnCount) { + result.setNumFields(resultColumnCount); + } } - for (int i = 0; i < fields.length; ++i) { + for (int i = 0; i < readColumnCount; ++i) { if (fields[i] != null) { result.setFieldValue(i, fields[i].next(result.getFieldValue(i))); } } + if (resultColumnCount > readColumnCount) { + for (int i = readColumnCount; i < resultColumnCount; ++i) { + // Default new treeReaderSchema evolution fields to NULL. + result.setFieldValue(i, null); + } + } } return result; } @Override - public void nextBatch(VectorizedRowBatch batch, - int batchSize) throws IOException { - for(int i=0; i < fields.length && - (vectorColumnCount == -1 || i < vectorColumnCount); ++i) { - batch.cols[i].reset(); - batch.cols[i].ensureSize((int) batchSize, false); - fields[i].nextVector(batch.cols[i], null, batchSize); + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + final ColumnVector[] result; + if (previousVector == null) { + result = new ColumnVector[readColumnCount]; + } else { + result = (ColumnVector[]) previousVector; } - } - @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - super.nextVector(previousVector, isNull, batchSize); - StructColumnVector result = (StructColumnVector) previousVector; - if (result.noNulls || !(result.isRepeating && result.isNull[0])) { - result.isRepeating = false; + // Read all the members of struct as column vectors + for (int i = 0; i < readColumnCount; i++) { + if (fields[i] != null) { + if (result[i] == null) { + result[i] = (ColumnVector) fields[i].nextVector(null, batchSize); + } else { + fields[i].nextVector(result[i], batchSize); + } + } + } - // Read all the members of struct as column vectors - boolean[] mask = result.noNulls ? null : result.isNull; - for (int f = 0; f < fields.length; f++) { - if (fields[f] != null) { - fields[f].nextVector(result.fields[f], mask, batchSize); + // Default additional treeReaderSchema evolution fields to NULL. + if (vectorColumnCount != -1 && vectorColumnCount > readColumnCount) { + for (int i = readColumnCount; i < vectorColumnCount; ++i) { + ColumnVector colVector = result[i]; + if (colVector != null) { + colVector.isRepeating = true; + colVector.noNulls = false; + colVector.isNull[0] = true; } } } + + return result; } @Override @@ -2140,18 +2208,19 @@ public class TreeReaderFactory { protected final TreeReader[] fields; protected RunLengthByteReader tags; - protected UnionTreeReader(int fileColumn, - TypeDescription readerSchema, - SchemaEvolution treeReaderSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { - super(fileColumn); - List<TypeDescription> childrenTypes = readerSchema.getChildren(); - int fieldCount = childrenTypes.size(); + protected UnionTreeReader(int columnId, + TreeReaderSchema treeReaderSchema, + boolean[] included, + boolean skipCorrupt) throws IOException { + super(columnId); + OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); + int fieldCount = type.getSubtypesCount(); this.fields = new TreeReader[fieldCount]; for (int i = 0; i < fieldCount; ++i) { - TypeDescription subtype = childrenTypes.get(i); - this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); + int subtype = type.getSubtypes(i); + if (included == null || included[subtype]) { + this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); + } } } @@ -2183,25 +2252,9 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previousVector, - boolean[] isNull, - int batchSize) throws IOException { - UnionColumnVector result = (UnionColumnVector) previousVector; - super.nextVector(result, isNull, batchSize); - if (result.noNulls || !(result.isRepeating && result.isNull[0])) { - result.isRepeating = false; - tags.nextVector(result.noNulls ? null : result.isNull, result.tags, - batchSize); - boolean[] ignore = new boolean[(int) batchSize]; - for (int f = 0; f < result.fields.length; ++f) { - // build the ignore list for this tag - for (int r = 0; r < batchSize; ++r) { - ignore[r] = (!result.noNulls && result.isNull[r]) || - result.tags[r] != f; - } - fields[f].nextVector(result.fields[f], ignore, batchSize); - } - } + public Object nextVector(Object previousVector, final int batchSize) throws IOException { + throw new UnsupportedOperationException( + "NextVector is not supported operation for Union type"); } @Override @@ -2235,15 +2288,13 @@ public class TreeReaderFactory { protected final TreeReader elementReader; protected IntegerReader lengths = null; - protected ListTreeReader(int fileColumn, - TypeDescription readerSchema, - SchemaEvolution treeReaderSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { - super(fileColumn); - TypeDescription elementType = readerSchema.getChildren().get(0); - elementReader = createTreeReader(elementType, treeReaderSchema, included, - skipCorrupt); + protected ListTreeReader(int columnId, + TreeReaderSchema treeReaderSchema, + boolean[] included, + boolean skipCorrupt) throws IOException { + super(columnId); + OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); + elementReader = createTreeReader(type.getSubtypes(0), treeReaderSchema, included, skipCorrupt); } @Override @@ -2284,27 +2335,9 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previous, - boolean[] isNull, - int batchSize) throws IOException { - ListColumnVector result = (ListColumnVector) previous; - super.nextVector(result, isNull, batchSize); - // if we have some none-null values, then read them - if (result.noNulls || !(result.isRepeating && result.isNull[0])) { - lengths.nextVector(result, result.lengths, batchSize); - // even with repeating lengths, the list doesn't repeat - result.isRepeating = false; - // build the offsets vector and figure out how many children to read - result.childCount = 0; - for (int r = 0; r < batchSize; ++r) { - if (result.noNulls || !result.isNull[r]) { - result.offsets[r] = result.childCount; - result.childCount += result.lengths[r]; - } - } - result.child.ensureSize(result.childCount, false); - elementReader.nextVector(result.child, null, result.childCount); - } + public Object nextVector(Object previous, final int batchSize) throws IOException { + throw new UnsupportedOperationException( + "NextVector is not supported operation for List type"); } @Override @@ -2345,16 +2378,24 @@ public class TreeReaderFactory { protected final TreeReader valueReader; protected IntegerReader lengths = null; - protected MapTreeReader(int fileColumn, - TypeDescription readerSchema, - SchemaEvolution treeReaderSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { - super(fileColumn); - TypeDescription keyType = readerSchema.getChildren().get(0); - TypeDescription valueType = readerSchema.getChildren().get(1); - keyReader = createTreeReader(keyType, treeReaderSchema, included, skipCorrupt); - valueReader = createTreeReader(valueType, treeReaderSchema, included, skipCorrupt); + protected MapTreeReader(int columnId, + TreeReaderSchema treeReaderSchema, + boolean[] included, + boolean skipCorrupt) throws IOException { + super(columnId); + OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); + int keyColumn = type.getSubtypes(0); + int valueColumn = type.getSubtypes(1); + if (included == null || included[keyColumn]) { + keyReader = createTreeReader(keyColumn, treeReaderSchema, included, skipCorrupt); + } else { + keyReader = null; + } + if (included == null || included[valueColumn]) { + valueReader = createTreeReader(valueColumn, treeReaderSchema, included, skipCorrupt); + } else { + valueReader = null; + } } @Override @@ -2388,28 +2429,9 @@ public class TreeReaderFactory { } @Override - public void nextVector(ColumnVector previous, - boolean[] isNull, - int batchSize) throws IOException { - MapColumnVector result = (MapColumnVector) previous; - super.nextVector(result, isNull, batchSize); - if (result.noNulls || !(result.isRepeating && result.isNull[0])) { - lengths.nextVector(result, result.lengths, batchSize); - // even with repeating lengths, the map doesn't repeat - result.isRepeating = false; - // build the offsets vector and figure out how many children to read - result.childCount = 0; - for (int r = 0; r < batchSize; ++r) { - if (result.noNulls || !result.isNull[r]) { - result.offsets[r] = result.childCount; - result.childCount += result.lengths[r]; - } - } - result.keys.ensureSize(result.childCount, false); - result.values.ensureSize(result.childCount, false); - keyReader.nextVector(result.keys, null, result.childCount); - valueReader.nextVector(result.values, null, result.childCount); - } + public Object nextVector(Object previous, final int batchSize) throws IOException { + throw new UnsupportedOperationException( + "NextVector is not supported operation for Map type"); } @Override @@ -2449,61 +2471,61 @@ public class TreeReaderFactory { } } - public static TreeReader createTreeReader(TypeDescription readerType, - SchemaEvolution evolution, - boolean[] included, - boolean skipCorrupt - ) throws IOException { - TypeDescription fileType = evolution.getFileType(readerType); - if (fileType == null || - (included != null && !included[readerType.getId()])) { - return new NullTreeReader(0); - } - switch (readerType.getCategory()) { + public static TreeReader createTreeReader(int columnId, + TreeReaderSchema treeReaderSchema, + boolean[] included, + boolean skipCorrupt + ) throws IOException { + OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); + switch (type.getKind()) { case BOOLEAN: - return new BooleanTreeReader(fileType.getId()); + return new BooleanTreeReader(columnId); case BYTE: - return new ByteTreeReader(fileType.getId()); + return new ByteTreeReader(columnId); case DOUBLE: - return new DoubleTreeReader(fileType.getId()); + return new DoubleTreeReader(columnId); case FLOAT: - return new FloatTreeReader(fileType.getId()); + return new FloatTreeReader(columnId); case SHORT: - return new ShortTreeReader(fileType.getId()); + return new ShortTreeReader(columnId); case INT: - return new IntTreeReader(fileType.getId()); + return new IntTreeReader(columnId); case LONG: - return new LongTreeReader(fileType.getId(), skipCorrupt); + return new LongTreeReader(columnId, skipCorrupt); case STRING: - return new StringTreeReader(fileType.getId()); + return new StringTreeReader(columnId); case CHAR: - return new CharTreeReader(fileType.getId(), readerType.getMaxLength()); + if (!type.hasMaximumLength()) { + throw new IllegalArgumentException("ORC char type has no length specified"); + } + return new CharTreeReader(columnId, type.getMaximumLength()); case VARCHAR: - return new VarcharTreeReader(fileType.getId(), readerType.getMaxLength()); + if (!type.hasMaximumLength()) { + throw new IllegalArgumentException("ORC varchar type has no length specified"); + } + return new VarcharTreeReader(columnId, type.getMaximumLength()); case BINARY: - return new BinaryTreeReader(fileType.getId()); + return new BinaryTreeReader(columnId); case TIMESTAMP: - return new TimestampTreeReader(fileType.getId(), skipCorrupt); + return new TimestampTreeReader(columnId, skipCorrupt); case DATE: - return new DateTreeReader(fileType.getId()); + return new DateTreeReader(columnId); case DECIMAL: - return new DecimalTreeReader(fileType.getId(), readerType.getPrecision(), - readerType.getScale()); + int precision = + type.hasPrecision() ? type.getPrecision() : HiveDecimal.SYSTEM_DEFAULT_PRECISION; + int scale = type.hasScale() ? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE; + return new DecimalTreeReader(columnId, precision, scale); case STRUCT: - return new StructTreeReader(fileType.getId(), readerType, - evolution, included, skipCorrupt); + return new StructTreeReader(columnId, treeReaderSchema, included, skipCorrupt); case LIST: - return new ListTreeReader(fileType.getId(), readerType, - evolution, included, skipCorrupt); + return new ListTreeReader(columnId, treeReaderSchema, included, skipCorrupt); case MAP: - return new MapTreeReader(fileType.getId(), readerType, evolution, - included, skipCorrupt); + return new MapTreeReader(columnId, treeReaderSchema, included, skipCorrupt); case UNION: - return new UnionTreeReader(fileType.getId(), readerType, - evolution, included, skipCorrupt); + return new UnionTreeReader(columnId, treeReaderSchema, included, skipCorrupt); default: throw new IllegalArgumentException("Unsupported type " + - readerType.getCategory()); + type.getKind()); } } }
http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index e4d2e6e..816b52d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -71,29 +71,14 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf); } - rbCtx = Utilities.getVectorizedRowBatchCtx(conf); /** * Do we have schema on read in the configuration variables? */ - List<OrcProto.Type> types = file.getTypes(); - int dataColumns = rbCtx.getDataColumnCount(); - TypeDescription schema = - OrcInputFormat.getDesiredRowTypeDescr(conf, false, dataColumns); - if (schema == null) { - schema = file.getSchema(); - // Even if the user isn't doing schema evolution, cut the schema - // to the desired size. - if (schema.getCategory() == TypeDescription.Category.STRUCT && - schema.getChildren().size() > dataColumns) { - schema = schema.clone(); - List<TypeDescription> children = schema.getChildren(); - for(int c = children.size() - 1; c >= dataColumns; --c) { - children.remove(c); - } - } - } - Reader.Options options = new Reader.Options().schema(schema); + TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(conf, /* isAcidRead */ false); + List<OrcProto.Type> types = file.getTypes(); + Reader.Options options = new Reader.Options(); + options.schema(schema); this.offset = fileSplit.getStart(); this.length = fileSplit.getLength(); options.range(offset, length); @@ -102,6 +87,8 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect this.reader = file.rowsOptions(options); + rbCtx = Utilities.getVectorizedRowBatchCtx(conf); + columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(conf); int partitionColumnCount = rbCtx.getPartitionColumnCount(); @@ -116,6 +103,9 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect @Override public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { + if (!reader.hasNext()) { + return false; + } try { // Check and update partition cols if necessary. Ideally, this should be done // in CreateValue as the partition is constant per split. But since Hive uses @@ -128,9 +118,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect } addPartitionCols = false; } - if (!reader.nextBatch(value)) { - return false; - } + reader.nextBatch(value); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 8e52907..70fe803 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -101,6 +101,8 @@ public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer } } + private static final long NANOS_PER_MILLI = 1000000; + /** * Set the value for a given column value within a batch. * @param rowId the row to set http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java index 96af65a..2a82092 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java @@ -51,11 +51,11 @@ public class TestTypeDescription { .addField("f4", TypeDescription.createDouble()) .addField("f5", TypeDescription.createBoolean())) .addField("f6", TypeDescription.createChar().withMaxLength(100)); - assertEquals("struct<f1:uniontype<tinyint,decimal(20,10)>,f2:struct<f3:date,f4:double,f5:boolean>,f6:char(100)>", + assertEquals("struct<f1:union<tinyint,decimal(20,10)>,f2:struct<f3:date,f4:double,f5:boolean>,f6:char(100)>", struct.toString()); assertEquals( "{\"category\": \"struct\", \"id\": 0, \"max\": 8, \"fields\": [\n" + - " \"f1\": {\"category\": \"uniontype\", \"id\": 1, \"max\": 3, \"children\": [\n" + + " \"f1\": {\"category\": \"union\", \"id\": 1, \"max\": 3, \"children\": [\n" + " {\"category\": \"tinyint\", \"id\": 2, \"max\": 2},\n" + " {\"category\": \"decimal\", \"id\": 3, \"max\": 3, \"precision\": 20, \"scale\": 10}]},\n" + " \"f2\": {\"category\": \"struct\", \"id\": 4, \"max\": 7, \"fields\": [\n" +
