http://git-wip-us.apache.org/repos/asf/hive/blob/0dd4621f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index 8bb32ea..8ee8cd7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -24,6 +24,7 @@ import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -35,9 +36,12 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils; +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; import org.apache.hadoop.hive.serde2.io.ByteWritable; @@ -56,8 +60,7 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.orc.TypeDescription; import org.apache.orc.impl.BitFieldReader; import org.apache.orc.impl.DynamicByteArray; import org.apache.orc.impl.InStream; @@ -75,60 +78,6 @@ import org.apache.orc.impl.StreamName; */ public class TreeReaderFactory { - private static final Logger LOG = - LoggerFactory.getLogger(TreeReaderFactory.class); - - public static class TreeReaderSchema { - - /** - * The types in the ORC file. - */ - List<OrcProto.Type> fileTypes; - - /** - * The treeReaderSchema that the reader should read as. - */ - List<OrcProto.Type> schemaTypes; - - /** - * The subtype of the row STRUCT. Different than 0 for ACID. - */ - int innerStructSubtype; - - public TreeReaderSchema() { - fileTypes = null; - schemaTypes = null; - innerStructSubtype = -1; - } - - public TreeReaderSchema fileTypes(List<OrcProto.Type> fileTypes) { - this.fileTypes = fileTypes; - return this; - } - - public TreeReaderSchema schemaTypes(List<OrcProto.Type> schemaTypes) { - this.schemaTypes = schemaTypes; - return this; - } - - public TreeReaderSchema innerStructSubtype(int innerStructSubtype) { - this.innerStructSubtype = innerStructSubtype; - return this; - } - - public List<OrcProto.Type> getFileTypes() { - return fileTypes; - } - - public List<OrcProto.Type> getSchemaTypes() { - return schemaTypes; - } - - public int getInnerStructSubtype() { - return innerStructSubtype; - } - } - public abstract static class TreeReader { protected final int columnId; protected BitFieldReader present = null; @@ -230,36 +179,60 @@ public class TreeReaderFactory { } /** + * Called at the top level to read into the given batch. + * @param batch the batch to read into + * @param batchSize the number of rows to read + * @throws IOException + */ + public void nextBatch(VectorizedRowBatch batch, + int batchSize) throws IOException { + batch.cols[0].reset(); + batch.cols[0].ensureSize(batchSize, false); + nextVector(batch.cols[0], null, batchSize); + } + + /** * Populates the isNull vector array in the previousVector object based on * the present stream values. This function is called from all the child * readers, and they all set the values based on isNull field value. * - * @param previousVector The columnVector object whose isNull value is populated + * @param previous The columnVector object whose isNull value is populated + * @param isNull Whether the each value was null at a higher level. If + * isNull is null, all values are non-null. * @param batchSize Size of the column vector - * @return next column vector * @throws IOException */ - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - ColumnVector result = (ColumnVector) previousVector; - if (present != null) { + public void nextVector(ColumnVector previous, + boolean[] isNull, + int batchSize) throws IOException { + if (present != null || isNull != null) { // Set noNulls and isNull vector of the ColumnVector based on // present stream - result.noNulls = true; + previous.noNulls = true; + boolean allNull = true; for (int i = 0; i < batchSize; i++) { - result.isNull[i] = (present.next() != 1); - if (result.noNulls && result.isNull[i]) { - result.noNulls = false; + if (isNull == null || !isNull[i]) { + if (present != null && present.next() != 1) { + previous.noNulls = false; + previous.isNull[i] = true; + } else { + previous.isNull[i] = false; + allNull = false; + } + } else { + previous.noNulls = false; + previous.isNull[i] = true; } } + previous.isRepeating = !previous.noNulls && allNull; } else { - // There is not present stream, this means that all the values are + // There is no present stream, this means that all the values are // present. - result.noNulls = true; + previous.noNulls = true; for (int i = 0; i < batchSize; i++) { - result.isNull[i] = false; + previous.isNull[i] = false; } } - return previousVector; } public BitFieldReader getPresent() { @@ -267,6 +240,46 @@ public class TreeReaderFactory { } } + public static class NullTreeReader extends TreeReader { + + public NullTreeReader(int columnId) throws IOException { + super(columnId); + } + + @Override + public void startStripe(Map<StreamName, InStream> streams, + OrcProto.StripeFooter footer) { + // PASS + } + + @Override + void skipRows(long rows) { + // PASS + } + + @Override + public void seek(PositionProvider position) { + // PASS + } + + @Override + public void seek(PositionProvider[] position) { + // PASS + } + + @Override + Object next(Object previous) { + return null; + } + + @Override + public void nextVector(ColumnVector vector, boolean[] isNull, int size) { + vector.noNulls = false; + vector.isNull[0] = true; + vector.isRepeating = true; + } + } + public static class BooleanTreeReader extends TreeReader { protected BitFieldReader reader = null; @@ -322,20 +335,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final LongColumnVector result; - if (previousVector == null) { - result = new LongColumnVector(); - } else { - result = (LongColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + LongColumnVector result = (LongColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); // Read value entries based on isNull entries reader.nextVector(result, batchSize); - return result; } } @@ -387,20 +396,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final LongColumnVector result; - if (previousVector == null) { - result = new LongColumnVector(); - } else { - result = (LongColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final LongColumnVector result = (LongColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, batchSize); - return result; + reader.nextVector(result, result.vector, batchSize); } @Override @@ -473,20 +478,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final LongColumnVector result; - if (previousVector == null) { - result = new LongColumnVector(); - } else { - result = (LongColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final LongColumnVector result = (LongColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, batchSize); - return result; + reader.nextVector(result, result.vector, batchSize); } @Override @@ -559,20 +560,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final LongColumnVector result; - if (previousVector == null) { - result = new LongColumnVector(); - } else { - result = (LongColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final LongColumnVector result = (LongColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, batchSize); - return result; + reader.nextVector(result, result.vector, batchSize); } @Override @@ -646,20 +643,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final LongColumnVector result; - if (previousVector == null) { - result = new LongColumnVector(); - } else { - result = (LongColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final LongColumnVector result = (LongColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, batchSize); - return result; + reader.nextVector(result, result.vector, batchSize); } @Override @@ -719,16 +712,13 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final DoubleColumnVector result; - if (previousVector == null) { - result = new DoubleColumnVector(); - } else { - result = (DoubleColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final DoubleColumnVector result = (DoubleColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); final boolean hasNulls = !result.noNulls; boolean allNulls = hasNulls; @@ -768,7 +758,6 @@ public class TreeReaderFactory { } result.isRepeating = repeating; } - return result; } @Override @@ -832,16 +821,13 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final DoubleColumnVector result; - if (previousVector == null) { - result = new DoubleColumnVector(); - } else { - result = (DoubleColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final DoubleColumnVector result = (DoubleColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); final boolean hasNulls = !result.noNulls; boolean allNulls = hasNulls; @@ -881,8 +867,6 @@ public class TreeReaderFactory { } result.isRepeating = repeating; } - - return result; } @Override @@ -974,19 +958,15 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final BytesColumnVector result; - if (previousVector == null) { - result = new BytesColumnVector(); - } else { - result = (BytesColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final BytesColumnVector result = (BytesColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize); - return result; } @Override @@ -1011,7 +991,6 @@ public class TreeReaderFactory { private final TimeZone readerTimeZone; private TimeZone writerTimeZone; private boolean hasSameTZRules; - private TimestampWritable scratchTimestampWritable; TimestampTreeReader(int columnId, boolean skipCorrupt) throws IOException { this(columnId, null, null, null, null, skipCorrupt); @@ -1115,9 +1094,9 @@ public class TreeReaderFactory { int newNanos = parseNanos(nanos.next()); // fix the rounding when we divided by 1000. if (millis >= 0) { - millis += newNanos / 1000000; + millis += newNanos / WriterImpl.NANOS_PER_MILLI; } else { - millis -= newNanos / 1000000; + millis -= newNanos / WriterImpl.NANOS_PER_MILLI; } long offset = 0; // If reader and writer time zones have different rules, adjust the timezone difference @@ -1144,31 +1123,45 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final TimestampColumnVector result; - if (previousVector == null) { - result = new TimestampColumnVector(); - } else { - result = (TimestampColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + TimestampColumnVector result = (TimestampColumnVector) previousVector; + super.nextVector(previousVector, isNull, batchSize); - result.reset(); - if (scratchTimestampWritable == null) { - scratchTimestampWritable = new TimestampWritable(); - } - Object obj; for (int i = 0; i < batchSize; i++) { - obj = next(scratchTimestampWritable); - if (obj == null) { - result.noNulls = false; - result.isNull[i] = true; - } else { - TimestampWritable writable = (TimestampWritable) obj; - result.set(i, writable.getTimestamp()); + if (result.noNulls || !result.isNull[i]) { + long millis = data.next() + base_timestamp; + int newNanos = parseNanos(nanos.next()); + if (millis < 0 && newNanos != 0) { + millis -= 1; + } + millis *= WriterImpl.MILLIS_PER_SECOND; + long offset = 0; + // If reader and writer time zones have different rules, adjust the timezone difference + // between reader and writer taking day light savings into account. + if (!hasSameTZRules) { + offset = writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(millis); + } + long adjustedMillis = millis + offset; + // Sometimes the reader timezone might have changed after adding the adjustedMillis. + // To account for that change, check for any difference in reader timezone after + // adding adjustedMillis. If so use the new offset (offset at adjustedMillis point of time). + if (!hasSameTZRules && + (readerTimeZone.getOffset(millis) != readerTimeZone.getOffset(adjustedMillis))) { + long newOffset = + writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(adjustedMillis); + adjustedMillis = millis + newOffset; + } + result.time[i] = adjustedMillis; + result.nanos[i] = newNanos; + if (result.isRepeating && i != 0 && + (result.time[0] != result.time[i] || + result.nanos[0] != result.nanos[i])) { + result.isRepeating = false; + } } } - - return result; } private static int parseNanos(long serialized) { @@ -1253,20 +1246,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final LongColumnVector result; - if (previousVector == null) { - result = new LongColumnVector(); - } else { - result = (LongColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final LongColumnVector result = (LongColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, batchSize); - return result; + reader.nextVector(result, result.vector, batchSize); } @Override @@ -1278,7 +1267,7 @@ public class TreeReaderFactory { public static class DecimalTreeReader extends TreeReader { protected InStream valueStream; protected IntegerReader scaleReader = null; - private LongColumnVector scratchScaleVector; + private int[] scratchScaleVector; private final int precision; private final int scale; @@ -1293,7 +1282,7 @@ public class TreeReaderFactory { super(columnId, present); this.precision = precision; this.scale = scale; - this.scratchScaleVector = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + this.scratchScaleVector = new int[VectorizedRowBatch.DEFAULT_SIZE]; this.valueStream = valueStream; if (scaleStream != null && encoding != null) { checkEncoding(encoding); @@ -1352,46 +1341,34 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final DecimalColumnVector result; - if (previousVector == null) { - result = new DecimalColumnVector(precision, scale); - } else { - result = (DecimalColumnVector) previousVector; - } - - // Save the reference for isNull in the scratch vector - boolean[] scratchIsNull = scratchScaleVector.isNull; + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final DecimalColumnVector result = (DecimalColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); + if (batchSize > scratchScaleVector.length) { + scratchScaleVector = new int[(int) batchSize]; + } + scaleReader.nextVector(result, scratchScaleVector, batchSize); // Read value entries based on isNull entries - if (result.isRepeating) { - if (!result.isNull[0]) { + if (result.noNulls) { + for (int r=0; r < batchSize; ++r) { BigInteger bInt = SerializationUtils.readBigInteger(valueStream); - short scaleInData = (short) scaleReader.next(); - HiveDecimal dec = HiveDecimal.create(bInt, scaleInData); - dec = HiveDecimal.enforcePrecisionScale(dec, precision, scale); - result.set(0, dec); + HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]); + result.set(r, dec); } - } else { - // result vector has isNull values set, use the same to read scale vector. - scratchScaleVector.isNull = result.isNull; - scaleReader.nextVector(scratchScaleVector, batchSize); - for (int i = 0; i < batchSize; i++) { - if (!result.isNull[i]) { + } else if (!result.isRepeating || !result.isNull[0]) { + for (int r=0; r < batchSize; ++r) { + if (!result.isNull[r]) { BigInteger bInt = SerializationUtils.readBigInteger(valueStream); - short scaleInData = (short) scratchScaleVector.vector[i]; - HiveDecimal dec = HiveDecimal.create(bInt, scaleInData); - dec = HiveDecimal.enforcePrecisionScale(dec, precision, scale); - result.set(i, dec); + HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]); + result.set(r, dec); } } } - // Switch back the null vector. - scratchScaleVector.isNull = scratchIsNull; - return result; } @Override @@ -1481,8 +1458,10 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - return reader.nextVector(previousVector, batchSize); + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + reader.nextVector(previousVector, isNull, batchSize); } @Override @@ -1501,7 +1480,7 @@ public class TreeReaderFactory { BytesColumnVector result, final int batchSize) throws IOException { // Read lengths scratchlcv.isNull = result.isNull; // Notice we are replacing the isNull vector here... - lengths.nextVector(scratchlcv, batchSize); + lengths.nextVector(scratchlcv, scratchlcv.vector, batchSize); int totalLength = 0; if (!scratchlcv.isRepeating) { for (int i = 0; i < batchSize; i++) { @@ -1532,31 +1511,35 @@ public class TreeReaderFactory { } // This method has the common code for reading in bytes into a BytesColumnVector. - public static void readOrcByteArrays(InStream stream, IntegerReader lengths, - LongColumnVector scratchlcv, - BytesColumnVector result, final int batchSize) throws IOException { - - byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv, result, batchSize); - - // Too expensive to figure out 'repeating' by comparisons. - result.isRepeating = false; - int offset = 0; - if (!scratchlcv.isRepeating) { - for (int i = 0; i < batchSize; i++) { - if (!scratchlcv.isNull[i]) { - result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]); - offset += scratchlcv.vector[i]; - } else { - result.setRef(i, allBytes, 0, 0); + public static void readOrcByteArrays(InStream stream, + IntegerReader lengths, + LongColumnVector scratchlcv, + BytesColumnVector result, + int batchSize) throws IOException { + if (result.noNulls || !(result.isRepeating && result.isNull[0])) { + byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv, + result, (int) batchSize); + + // Too expensive to figure out 'repeating' by comparisons. + result.isRepeating = false; + int offset = 0; + if (!scratchlcv.isRepeating) { + for (int i = 0; i < batchSize; i++) { + if (!scratchlcv.isNull[i]) { + result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]); + offset += scratchlcv.vector[i]; + } else { + result.setRef(i, allBytes, 0, 0); + } } - } - } else { - for (int i = 0; i < batchSize; i++) { - if (!scratchlcv.isNull[i]) { - result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]); - offset += scratchlcv.vector[0]; - } else { - result.setRef(i, allBytes, 0, 0); + } else { + for (int i = 0; i < batchSize; i++) { + if (!scratchlcv.isNull[i]) { + result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]); + offset += scratchlcv.vector[0]; + } else { + result.setRef(i, allBytes, 0, 0); + } } } } @@ -1641,19 +1624,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final BytesColumnVector result; - if (previousVector == null) { - result = new BytesColumnVector(); - } else { - result = (BytesColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final BytesColumnVector result = (BytesColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); - BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize); - return result; + BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, + result, batchSize); } @Override @@ -1816,18 +1796,15 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final BytesColumnVector result; + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final BytesColumnVector result = (BytesColumnVector) previousVector; int offset; int length; - if (previousVector == null) { - result = new BytesColumnVector(); - } else { - result = (BytesColumnVector) previousVector; - } // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); if (dictionaryBuffer != null) { @@ -1838,7 +1815,8 @@ public class TreeReaderFactory { // Read string offsets scratchlcv.isNull = result.isNull; - reader.nextVector(scratchlcv, batchSize); + scratchlcv.ensureSize((int) batchSize, false); + reader.nextVector(scratchlcv, scratchlcv.vector, batchSize); if (!scratchlcv.isRepeating) { // The vector has non-repeating strings. Iterate thru the batch @@ -1878,7 +1856,6 @@ public class TreeReaderFactory { } } } - return result; } int getDictionaryEntryLength(int entry, int offset) { @@ -1936,11 +1913,13 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { // Get the vector of strings from StringTreeReader, then make a 2nd pass to // adjust down the length (right trim and truncate) if necessary. - BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize); - + super.nextVector(previousVector, isNull, batchSize); + BytesColumnVector result = (BytesColumnVector) previousVector; int adjustedDownLen; if (result.isRepeating) { if (result.noNulls || !result.isNull[0]) { @@ -1973,7 +1952,6 @@ public class TreeReaderFactory { } } } - return result; } } @@ -2010,10 +1988,13 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { // Get the vector of strings from StringTreeReader, then make a 2nd pass to // adjust down the length (truncate) if necessary. - BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize); + super.nextVector(previousVector, isNull, batchSize); + BytesColumnVector result = (BytesColumnVector) previousVector; int adjustedDownLen; if (result.isRepeating) { @@ -2045,62 +2026,26 @@ public class TreeReaderFactory { } } } - return result; } } protected static class StructTreeReader extends TreeReader { - private final int readColumnCount; - private final int resultColumnCount; protected final TreeReader[] fields; - private final String[] fieldNames; - protected StructTreeReader( - int columnId, - TreeReaderSchema treeReaderSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { + protected StructTreeReader(int columnId, + TypeDescription readerSchema, + SchemaEvolution treeReaderSchema, + boolean[] included, + boolean skipCorrupt) throws IOException { super(columnId); - OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId); - - OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId); + TypeDescription fileSchema = treeReaderSchema.getFileType(readerSchema); - readColumnCount = Math.min(fileStructType.getFieldNamesCount(), schemaStructType.getFieldNamesCount()); - - if (columnId == treeReaderSchema.getInnerStructSubtype()) { - // If there are more result columns than reader columns, we will default those additional - // columns to NULL. - resultColumnCount = schemaStructType.getFieldNamesCount(); - } else { - resultColumnCount = readColumnCount; - } - - this.fields = new TreeReader[readColumnCount]; - this.fieldNames = new String[readColumnCount]; - - if (included == null) { - for (int i = 0; i < readColumnCount; ++i) { - int subtype = schemaStructType.getSubtypes(i); - this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); - // Use the treeReaderSchema evolution name since file/reader types may not have the real column name. - this.fieldNames[i] = schemaStructType.getFieldNames(i); - } - } else { - for (int i = 0; i < readColumnCount; ++i) { - int subtype = schemaStructType.getSubtypes(i); - if (subtype >= included.length) { - throw new IOException("subtype " + subtype + " exceeds the included array size " + - included.length + " fileTypes " + treeReaderSchema.getFileTypes().toString() + - " schemaTypes " + treeReaderSchema.getSchemaTypes().toString() + - " innerStructSubtype " + treeReaderSchema.getInnerStructSubtype()); - } - if (included[subtype]) { - this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); - } - // Use the treeReaderSchema evolution name since file/reader types may not have the real column name. - this.fieldNames[i] = schemaStructType.getFieldNames(i); - } + List<TypeDescription> childrenTypes = readerSchema.getChildren(); + this.fields = new TreeReader[childrenTypes.size()]; + for (int i = 0; i < fields.length; ++i) { + TypeDescription subtype = childrenTypes.get(i); + this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); } } @@ -2120,65 +2065,52 @@ public class TreeReaderFactory { OrcStruct result = null; if (valuePresent) { if (previous == null) { - result = new OrcStruct(resultColumnCount); + result = new OrcStruct(fields.length); } else { result = (OrcStruct) previous; // If the input format was initialized with a file with a // different number of fields, the number of fields needs to // be updated to the correct number - if (result.getNumFields() != resultColumnCount) { - result.setNumFields(resultColumnCount); - } + result.setNumFields(fields.length); } - for (int i = 0; i < readColumnCount; ++i) { + for (int i = 0; i < fields.length; ++i) { if (fields[i] != null) { result.setFieldValue(i, fields[i].next(result.getFieldValue(i))); } } - if (resultColumnCount > readColumnCount) { - for (int i = readColumnCount; i < resultColumnCount; ++i) { - // Default new treeReaderSchema evolution fields to NULL. - result.setFieldValue(i, null); - } - } } return result; } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final ColumnVector[] result; - if (previousVector == null) { - result = new ColumnVector[readColumnCount]; - } else { - result = (ColumnVector[]) previousVector; + public void nextBatch(VectorizedRowBatch batch, + int batchSize) throws IOException { + for(int i=0; i < fields.length && + (vectorColumnCount == -1 || i < vectorColumnCount); ++i) { + batch.cols[i].reset(); + batch.cols[i].ensureSize((int) batchSize, false); + fields[i].nextVector(batch.cols[i], null, batchSize); } + } - // Read all the members of struct as column vectors - for (int i = 0; i < readColumnCount; i++) { - if (fields[i] != null) { - if (result[i] == null) { - result[i] = (ColumnVector) fields[i].nextVector(null, batchSize); - } else { - fields[i].nextVector(result[i], batchSize); - } - } - } + @Override + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + super.nextVector(previousVector, isNull, batchSize); + StructColumnVector result = (StructColumnVector) previousVector; + if (result.noNulls || !(result.isRepeating && result.isNull[0])) { + result.isRepeating = false; - // Default additional treeReaderSchema evolution fields to NULL. - if (vectorColumnCount != -1 && vectorColumnCount > readColumnCount) { - for (int i = readColumnCount; i < vectorColumnCount; ++i) { - ColumnVector colVector = result[i]; - if (colVector != null) { - colVector.isRepeating = true; - colVector.noNulls = false; - colVector.isNull[0] = true; + // Read all the members of struct as column vectors + boolean[] mask = result.noNulls ? null : result.isNull; + for (int f = 0; f < fields.length; f++) { + if (fields[f] != null) { + fields[f].nextVector(result.fields[f], mask, batchSize); } } } - - return result; } @Override @@ -2208,19 +2140,18 @@ public class TreeReaderFactory { protected final TreeReader[] fields; protected RunLengthByteReader tags; - protected UnionTreeReader(int columnId, - TreeReaderSchema treeReaderSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { - super(columnId); - OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); - int fieldCount = type.getSubtypesCount(); + protected UnionTreeReader(int fileColumn, + TypeDescription readerSchema, + SchemaEvolution treeReaderSchema, + boolean[] included, + boolean skipCorrupt) throws IOException { + super(fileColumn); + List<TypeDescription> childrenTypes = readerSchema.getChildren(); + int fieldCount = childrenTypes.size(); this.fields = new TreeReader[fieldCount]; for (int i = 0; i < fieldCount; ++i) { - int subtype = type.getSubtypes(i); - if (included == null || included[subtype]) { - this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); - } + TypeDescription subtype = childrenTypes.get(i); + this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); } } @@ -2252,9 +2183,25 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - throw new UnsupportedOperationException( - "NextVector is not supported operation for Union type"); + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + UnionColumnVector result = (UnionColumnVector) previousVector; + super.nextVector(result, isNull, batchSize); + if (result.noNulls || !(result.isRepeating && result.isNull[0])) { + result.isRepeating = false; + tags.nextVector(result.noNulls ? null : result.isNull, result.tags, + batchSize); + boolean[] ignore = new boolean[(int) batchSize]; + for (int f = 0; f < result.fields.length; ++f) { + // build the ignore list for this tag + for (int r = 0; r < batchSize; ++r) { + ignore[r] = (!result.noNulls && result.isNull[r]) || + result.tags[r] != f; + } + fields[f].nextVector(result.fields[f], ignore, batchSize); + } + } } @Override @@ -2288,13 +2235,15 @@ public class TreeReaderFactory { protected final TreeReader elementReader; protected IntegerReader lengths = null; - protected ListTreeReader(int columnId, - TreeReaderSchema treeReaderSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { - super(columnId); - OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); - elementReader = createTreeReader(type.getSubtypes(0), treeReaderSchema, included, skipCorrupt); + protected ListTreeReader(int fileColumn, + TypeDescription readerSchema, + SchemaEvolution treeReaderSchema, + boolean[] included, + boolean skipCorrupt) throws IOException { + super(fileColumn); + TypeDescription elementType = readerSchema.getChildren().get(0); + elementReader = createTreeReader(elementType, treeReaderSchema, included, + skipCorrupt); } @Override @@ -2335,9 +2284,27 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previous, final int batchSize) throws IOException { - throw new UnsupportedOperationException( - "NextVector is not supported operation for List type"); + public void nextVector(ColumnVector previous, + boolean[] isNull, + int batchSize) throws IOException { + ListColumnVector result = (ListColumnVector) previous; + super.nextVector(result, isNull, batchSize); + // if we have some none-null values, then read them + if (result.noNulls || !(result.isRepeating && result.isNull[0])) { + lengths.nextVector(result, result.lengths, batchSize); + // even with repeating lengths, the list doesn't repeat + result.isRepeating = false; + // build the offsets vector and figure out how many children to read + result.childCount = 0; + for (int r = 0; r < batchSize; ++r) { + if (result.noNulls || !result.isNull[r]) { + result.offsets[r] = result.childCount; + result.childCount += result.lengths[r]; + } + } + result.child.ensureSize(result.childCount, false); + elementReader.nextVector(result.child, null, result.childCount); + } } @Override @@ -2378,24 +2345,16 @@ public class TreeReaderFactory { protected final TreeReader valueReader; protected IntegerReader lengths = null; - protected MapTreeReader(int columnId, - TreeReaderSchema treeReaderSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { - super(columnId); - OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); - int keyColumn = type.getSubtypes(0); - int valueColumn = type.getSubtypes(1); - if (included == null || included[keyColumn]) { - keyReader = createTreeReader(keyColumn, treeReaderSchema, included, skipCorrupt); - } else { - keyReader = null; - } - if (included == null || included[valueColumn]) { - valueReader = createTreeReader(valueColumn, treeReaderSchema, included, skipCorrupt); - } else { - valueReader = null; - } + protected MapTreeReader(int fileColumn, + TypeDescription readerSchema, + SchemaEvolution treeReaderSchema, + boolean[] included, + boolean skipCorrupt) throws IOException { + super(fileColumn); + TypeDescription keyType = readerSchema.getChildren().get(0); + TypeDescription valueType = readerSchema.getChildren().get(1); + keyReader = createTreeReader(keyType, treeReaderSchema, included, skipCorrupt); + valueReader = createTreeReader(valueType, treeReaderSchema, included, skipCorrupt); } @Override @@ -2429,9 +2388,28 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previous, final int batchSize) throws IOException { - throw new UnsupportedOperationException( - "NextVector is not supported operation for Map type"); + public void nextVector(ColumnVector previous, + boolean[] isNull, + int batchSize) throws IOException { + MapColumnVector result = (MapColumnVector) previous; + super.nextVector(result, isNull, batchSize); + if (result.noNulls || !(result.isRepeating && result.isNull[0])) { + lengths.nextVector(result, result.lengths, batchSize); + // even with repeating lengths, the map doesn't repeat + result.isRepeating = false; + // build the offsets vector and figure out how many children to read + result.childCount = 0; + for (int r = 0; r < batchSize; ++r) { + if (result.noNulls || !result.isNull[r]) { + result.offsets[r] = result.childCount; + result.childCount += result.lengths[r]; + } + } + result.keys.ensureSize(result.childCount, false); + result.values.ensureSize(result.childCount, false); + keyReader.nextVector(result.keys, null, result.childCount); + valueReader.nextVector(result.values, null, result.childCount); + } } @Override @@ -2471,61 +2449,61 @@ public class TreeReaderFactory { } } - public static TreeReader createTreeReader(int columnId, - TreeReaderSchema treeReaderSchema, - boolean[] included, - boolean skipCorrupt - ) throws IOException { - OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); - switch (type.getKind()) { + public static TreeReader createTreeReader(TypeDescription readerType, + SchemaEvolution evolution, + boolean[] included, + boolean skipCorrupt + ) throws IOException { + TypeDescription fileType = evolution.getFileType(readerType); + if (fileType == null || + (included != null && !included[readerType.getId()])) { + return new NullTreeReader(0); + } + switch (readerType.getCategory()) { case BOOLEAN: - return new BooleanTreeReader(columnId); + return new BooleanTreeReader(fileType.getId()); case BYTE: - return new ByteTreeReader(columnId); + return new ByteTreeReader(fileType.getId()); case DOUBLE: - return new DoubleTreeReader(columnId); + return new DoubleTreeReader(fileType.getId()); case FLOAT: - return new FloatTreeReader(columnId); + return new FloatTreeReader(fileType.getId()); case SHORT: - return new ShortTreeReader(columnId); + return new ShortTreeReader(fileType.getId()); case INT: - return new IntTreeReader(columnId); + return new IntTreeReader(fileType.getId()); case LONG: - return new LongTreeReader(columnId, skipCorrupt); + return new LongTreeReader(fileType.getId(), skipCorrupt); case STRING: - return new StringTreeReader(columnId); + return new StringTreeReader(fileType.getId()); case CHAR: - if (!type.hasMaximumLength()) { - throw new IllegalArgumentException("ORC char type has no length specified"); - } - return new CharTreeReader(columnId, type.getMaximumLength()); + return new CharTreeReader(fileType.getId(), readerType.getMaxLength()); case VARCHAR: - if (!type.hasMaximumLength()) { - throw new IllegalArgumentException("ORC varchar type has no length specified"); - } - return new VarcharTreeReader(columnId, type.getMaximumLength()); + return new VarcharTreeReader(fileType.getId(), readerType.getMaxLength()); case BINARY: - return new BinaryTreeReader(columnId); + return new BinaryTreeReader(fileType.getId()); case TIMESTAMP: - return new TimestampTreeReader(columnId, skipCorrupt); + return new TimestampTreeReader(fileType.getId(), skipCorrupt); case DATE: - return new DateTreeReader(columnId); + return new DateTreeReader(fileType.getId()); case DECIMAL: - int precision = - type.hasPrecision() ? type.getPrecision() : HiveDecimal.SYSTEM_DEFAULT_PRECISION; - int scale = type.hasScale() ? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE; - return new DecimalTreeReader(columnId, precision, scale); + return new DecimalTreeReader(fileType.getId(), readerType.getPrecision(), + readerType.getScale()); case STRUCT: - return new StructTreeReader(columnId, treeReaderSchema, included, skipCorrupt); + return new StructTreeReader(fileType.getId(), readerType, + evolution, included, skipCorrupt); case LIST: - return new ListTreeReader(columnId, treeReaderSchema, included, skipCorrupt); + return new ListTreeReader(fileType.getId(), readerType, + evolution, included, skipCorrupt); case MAP: - return new MapTreeReader(columnId, treeReaderSchema, included, skipCorrupt); + return new MapTreeReader(fileType.getId(), readerType, evolution, + included, skipCorrupt); case UNION: - return new UnionTreeReader(columnId, treeReaderSchema, included, skipCorrupt); + return new UnionTreeReader(fileType.getId(), readerType, + evolution, included, skipCorrupt); default: throw new IllegalArgumentException("Unsupported type " + - type.getKind()); + readerType.getCategory()); } } }
http://git-wip-us.apache.org/repos/asf/hive/blob/0dd4621f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 816b52d..e4d2e6e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -71,14 +71,29 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf); } + rbCtx = Utilities.getVectorizedRowBatchCtx(conf); /** * Do we have schema on read in the configuration variables? */ - TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(conf, /* isAcidRead */ false); - List<OrcProto.Type> types = file.getTypes(); - Reader.Options options = new Reader.Options(); - options.schema(schema); + int dataColumns = rbCtx.getDataColumnCount(); + TypeDescription schema = + OrcInputFormat.getDesiredRowTypeDescr(conf, false, dataColumns); + if (schema == null) { + schema = file.getSchema(); + // Even if the user isn't doing schema evolution, cut the schema + // to the desired size. + if (schema.getCategory() == TypeDescription.Category.STRUCT && + schema.getChildren().size() > dataColumns) { + schema = schema.clone(); + List<TypeDescription> children = schema.getChildren(); + for(int c = children.size() - 1; c >= dataColumns; --c) { + children.remove(c); + } + } + } + Reader.Options options = new Reader.Options().schema(schema); + this.offset = fileSplit.getStart(); this.length = fileSplit.getLength(); options.range(offset, length); @@ -87,8 +102,6 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect this.reader = file.rowsOptions(options); - rbCtx = Utilities.getVectorizedRowBatchCtx(conf); - columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(conf); int partitionColumnCount = rbCtx.getPartitionColumnCount(); @@ -103,9 +116,6 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect @Override public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { - if (!reader.hasNext()) { - return false; - } try { // Check and update partition cols if necessary. Ideally, this should be done // in CreateValue as the partition is constant per split. But since Hive uses @@ -118,7 +128,9 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect } addPartitionCols = false; } - reader.nextBatch(value); + if (!reader.nextBatch(value)) { + return false; + } } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/hive/blob/0dd4621f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 70fe803..8e52907 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -101,8 +101,6 @@ public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer } } - private static final long NANOS_PER_MILLI = 1000000; - /** * Set the value for a given column value within a batch. * @param rowId the row to set http://git-wip-us.apache.org/repos/asf/hive/blob/0dd4621f/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java index 2a82092..96af65a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestTypeDescription.java @@ -51,11 +51,11 @@ public class TestTypeDescription { .addField("f4", TypeDescription.createDouble()) .addField("f5", TypeDescription.createBoolean())) .addField("f6", TypeDescription.createChar().withMaxLength(100)); - assertEquals("struct<f1:union<tinyint,decimal(20,10)>,f2:struct<f3:date,f4:double,f5:boolean>,f6:char(100)>", + assertEquals("struct<f1:uniontype<tinyint,decimal(20,10)>,f2:struct<f3:date,f4:double,f5:boolean>,f6:char(100)>", struct.toString()); assertEquals( "{\"category\": \"struct\", \"id\": 0, \"max\": 8, \"fields\": [\n" + - " \"f1\": {\"category\": \"union\", \"id\": 1, \"max\": 3, \"children\": [\n" + + " \"f1\": {\"category\": \"uniontype\", \"id\": 1, \"max\": 3, \"children\": [\n" + " {\"category\": \"tinyint\", \"id\": 2, \"max\": 2},\n" + " {\"category\": \"decimal\", \"id\": 3, \"max\": 3, \"precision\": 20, \"scale\": 10}]},\n" + " \"f2\": {\"category\": \"struct\", \"id\": 4, \"max\": 7, \"fields\": [\n" +