Revert "HIVE-12159: Create vectorized readers for the complex types (Owen O'Malley, reviewed by Matt McCline)"
This reverts commit 0dd4621f34f6043071474220a082268cda124b9d. Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d559b347 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d559b347 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d559b347 Branch: refs/heads/master Commit: d559b34755010b5ed3ecc31fa423d01788e5e875 Parents: 40e0c38 Author: Matt McCline <[email protected]> Authored: Fri Apr 15 16:00:18 2016 -0700 Committer: Matt McCline <[email protected]> Committed: Fri Apr 15 16:00:18 2016 -0700 ---------------------------------------------------------------------- .../llap/io/decode/OrcEncodedDataConsumer.java | 45 +- orc/src/java/org/apache/orc/OrcUtils.java | 75 - orc/src/java/org/apache/orc/Reader.java | 6 - orc/src/java/org/apache/orc/RecordReader.java | 8 +- .../java/org/apache/orc/TypeDescription.java | 62 +- .../org/apache/orc/impl/BitFieldReader.java | 5 +- .../java/org/apache/orc/impl/IntegerReader.java | 26 +- .../apache/orc/impl/RunLengthByteReader.java | 36 +- .../apache/orc/impl/RunLengthIntegerReader.java | 31 +- .../orc/impl/RunLengthIntegerReaderV2.java | 33 +- .../java/org/apache/orc/impl/WriterImpl.java | 47 +- .../ql/exec/vector/VectorizedRowBatchCtx.java | 13 +- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 43 +- .../hive/ql/io/orc/OrcRawRecordMerger.java | 3 +- .../hadoop/hive/ql/io/orc/ReaderImpl.java | 12 +- .../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 50 +- .../hadoop/hive/ql/io/orc/SchemaEvolution.java | 234 ++- .../hive/ql/io/orc/TreeReaderFactory.java | 838 ++++----- .../ql/io/orc/VectorizedOrcInputFormat.java | 32 +- .../hadoop/hive/ql/io/orc/WriterImpl.java | 2 + .../hive/ql/io/orc/TestTypeDescription.java | 4 +- .../hive/ql/io/orc/TestVectorOrcFile.java | 1634 +++++++++--------- .../hive/ql/io/orc/TestVectorizedORCReader.java | 7 +- .../hive/ql/exec/vector/BytesColumnVector.java | 11 - .../ql/exec/vector/TimestampColumnVector.java | 2 +- .../hive/ql/exec/vector/UnionColumnVector.java | 2 + 26 files changed, 1476 insertions(+), 1785 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index baaa4d7..7ee263d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.llap.io.decode; import java.io.IOException; -import java.util.List; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; @@ -28,12 +27,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.CompressionCodec; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; @@ -77,35 +71,6 @@ public class OrcEncodedDataConsumer stripes[m.getStripeIx()] = m; } - private static ColumnVector createColumn(OrcProto.Type type, - int batchSize) { - switch (type.getKind()) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - case DATE: - return new LongColumnVector(batchSize); - case FLOAT: - case DOUBLE: - return new DoubleColumnVector(batchSize); - case BINARY: - case STRING: - case CHAR: - case VARCHAR: - return new BytesColumnVector(batchSize); - case TIMESTAMP: - return new TimestampColumnVector(batchSize); - case DECIMAL: - return new DecimalColumnVector(batchSize, type.getPrecision(), - type.getScale()); - default: - throw new IllegalArgumentException("LLAP does not support " + - type.getKind()); - } - } - @Override protected void decodeBatch(OrcEncodedColumnBatch batch, Consumer<ColumnVectorBatch> downstreamConsumer) { @@ -147,15 +112,9 @@ public class OrcEncodedDataConsumer ColumnVectorBatch cvb = cvbPool.take(); assert cvb.cols.length == batch.getColumnIxs().length; // Must be constant per split. cvb.size = batchSize; - List<OrcProto.Type> types = fileMetadata.getTypes(); - int[] columnMapping = batch.getColumnIxs(); + for (int idx = 0; idx < batch.getColumnIxs().length; idx++) { - if (cvb.cols[idx] == null) { - // skip over the top level struct, but otherwise assume no complex - // types - cvb.cols[idx] = createColumn(types.get(columnMapping[idx]), batchSize); - } - columnReaders[idx].nextVector(cvb.cols[idx], null, batchSize); + cvb.cols[idx] = (ColumnVector)columnReaders[idx].nextVector(cvb.cols[idx], batchSize); } // we are done reading a batch, send it to consumer for processing http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/OrcUtils.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/OrcUtils.java b/orc/src/java/org/apache/orc/OrcUtils.java index 2ebe9a7..2e93254 100644 --- a/orc/src/java/org/apache/orc/OrcUtils.java +++ b/orc/src/java/org/apache/orc/OrcUtils.java @@ -449,79 +449,4 @@ public class OrcUtils { return columnId; } - /** - * Translate the given rootColumn from the list of types to a TypeDescription. - * @param types all of the types - * @param rootColumn translate this type - * @return a new TypeDescription that matches the given rootColumn - */ - public static - TypeDescription convertTypeFromProtobuf(List<OrcProto.Type> types, - int rootColumn) { - OrcProto.Type type = types.get(rootColumn); - switch (type.getKind()) { - case BOOLEAN: - return TypeDescription.createBoolean(); - case BYTE: - return TypeDescription.createByte(); - case SHORT: - return TypeDescription.createShort(); - case INT: - return TypeDescription.createInt(); - case LONG: - return TypeDescription.createLong(); - case FLOAT: - return TypeDescription.createFloat(); - case DOUBLE: - return TypeDescription.createDouble(); - case STRING: - return TypeDescription.createString(); - case CHAR: - return TypeDescription.createChar() - .withMaxLength(type.getMaximumLength()); - case VARCHAR: - return TypeDescription.createVarchar() - .withMaxLength(type.getMaximumLength()); - case BINARY: - return TypeDescription.createBinary(); - case TIMESTAMP: - return TypeDescription.createTimestamp(); - case DATE: - return TypeDescription.createDate(); - case DECIMAL: { - TypeDescription result = TypeDescription.createDecimal(); - if (type.hasScale()) { - result.withScale(type.getScale()); - } - if (type.hasPrecision()) { - result.withPrecision(type.getPrecision()); - } - return result; - } - case LIST: - return TypeDescription.createList( - convertTypeFromProtobuf(types, type.getSubtypes(0))); - case MAP: - return TypeDescription.createMap( - convertTypeFromProtobuf(types, type.getSubtypes(0)), - convertTypeFromProtobuf(types, type.getSubtypes(1))); - case STRUCT: { - TypeDescription result = TypeDescription.createStruct(); - for(int f=0; f < type.getSubtypesCount(); ++f) { - result.addField(type.getFieldNames(f), - convertTypeFromProtobuf(types, type.getSubtypes(f))); - } - return result; - } - case UNION: { - TypeDescription result = TypeDescription.createUnion(); - for(int f=0; f < type.getSubtypesCount(); ++f) { - result.addUnionChild( - convertTypeFromProtobuf(types, type.getSubtypes(f))); - } - return result; - } - } - throw new IllegalArgumentException("Unknown ORC type " + type.getKind()); - } } http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/Reader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/Reader.java b/orc/src/java/org/apache/orc/Reader.java index 62a05e9..be722b5 100644 --- a/orc/src/java/org/apache/orc/Reader.java +++ b/orc/src/java/org/apache/orc/Reader.java @@ -116,15 +116,9 @@ public interface Reader { ColumnStatistics[] getStatistics(); /** - * Get the type of rows in this ORC file. - */ - TypeDescription getSchema(); - - /** * Get the list of types contained in the file. The root type is the first * type in the list. * @return the list of flattened types - * @deprecated use getSchema instead */ List<OrcProto.Type> getTypes(); http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/RecordReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/RecordReader.java b/orc/src/java/org/apache/orc/RecordReader.java index 09ba0f0..7229dda 100644 --- a/orc/src/java/org/apache/orc/RecordReader.java +++ b/orc/src/java/org/apache/orc/RecordReader.java @@ -30,11 +30,13 @@ public interface RecordReader { * controlled by the callers. Caller need to look at * VectorizedRowBatch.size of the retunred object to know the batch * size read. - * @param batch a row batch object to read into - * @return were more rows available to read? + * @param previousBatch a row batch object that can be reused by the reader + * @return the row batch that was read. The batch will have a non-zero row + * count if the pointer isn't at the end of the file * @throws java.io.IOException */ - boolean nextBatch(VectorizedRowBatch batch) throws IOException; + VectorizedRowBatch nextBatch(VectorizedRowBatch previousBatch + ) throws IOException; /** * Get the row number of the row that will be returned by the following http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/TypeDescription.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/TypeDescription.java b/orc/src/java/org/apache/orc/TypeDescription.java index b8e057e..bd900ac 100644 --- a/orc/src/java/org/apache/orc/TypeDescription.java +++ b/orc/src/java/org/apache/orc/TypeDescription.java @@ -61,7 +61,7 @@ public class TypeDescription { LIST("array", false), MAP("map", false), STRUCT("struct", false), - UNION("uniontype", false); + UNION("union", false); Category(String name, boolean isPrimitive) { this.name = name; @@ -258,66 +258,6 @@ public class TypeDescription { return id; } - public TypeDescription clone() { - TypeDescription result = new TypeDescription(category); - result.maxLength = maxLength; - result.precision = precision; - result.scale = scale; - if (fieldNames != null) { - result.fieldNames.addAll(fieldNames); - } - if (children != null) { - for(TypeDescription child: children) { - TypeDescription clone = child.clone(); - clone.parent = result; - result.children.add(clone); - } - } - return result; - } - - @Override - public int hashCode() { - return getId(); - } - - @Override - public boolean equals(Object other) { - if (other == null || other.getClass() != TypeDescription.class) { - return false; - } - if (other == this) { - return true; - } - TypeDescription castOther = (TypeDescription) other; - if (category != castOther.category || - getId() != castOther.getId() || - getMaximumId() != castOther.getMaximumId() || - maxLength != castOther.maxLength || - scale != castOther.scale || - precision != castOther.precision) { - return false; - } - if (children != null) { - if (children.size() != castOther.children.size()) { - return false; - } - for (int i = 0; i < children.size(); ++i) { - if (!children.get(i).equals(castOther.children.get(i))) { - return false; - } - } - } - if (category == Category.STRUCT) { - for(int i=0; i < fieldNames.size(); ++i) { - if (!fieldNames.get(i).equals(castOther.fieldNames.get(i))) { - return false; - } - } - } - return true; - } - /** * Get the maximum id assigned to this type or its children. * The first call will cause all of the the ids in tree to be assigned, so http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/BitFieldReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/BitFieldReader.java b/orc/src/java/org/apache/orc/impl/BitFieldReader.java index dda7355..8d9d3cb 100644 --- a/orc/src/java/org/apache/orc/impl/BitFieldReader.java +++ b/orc/src/java/org/apache/orc/impl/BitFieldReader.java @@ -137,7 +137,7 @@ public class BitFieldReader { long previousLen) throws IOException { previous.isRepeating = true; for (int i = 0; i < previousLen; i++) { - if (previous.noNulls || !previous.isNull[i]) { + if (!previous.isNull[i]) { previous.vector[i] = next(); } else { // The default value of null for int types in vectorized @@ -150,8 +150,7 @@ public class BitFieldReader { // when determining the isRepeating flag. if (previous.isRepeating && i > 0 - && ((previous.vector[0] != previous.vector[i]) || - (previous.isNull[0] != previous.isNull[i]))) { + && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) { previous.isRepeating = false; } } http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/IntegerReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java index 8bef0f1..7dfd289 100644 --- a/orc/src/java/org/apache/orc/impl/IntegerReader.java +++ b/orc/src/java/org/apache/orc/impl/IntegerReader.java @@ -20,7 +20,7 @@ package org.apache.orc.impl; import java.io.IOException; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; /** * Interface for reading integers. @@ -57,25 +57,9 @@ public interface IntegerReader { /** * Return the next available vector for values. - * @param column the column being read - * @param data the vector to read into - * @param length the number of numbers to read - * @throws IOException - */ - void nextVector(ColumnVector column, - long[] data, - int length - ) throws IOException; - - /** - * Return the next available vector for values. Does not change the - * value of column.isRepeating. - * @param column the column being read - * @param data the vector to read into - * @param length the number of numbers to read + * @return * @throws IOException */ - void nextVector(ColumnVector column, - int[] data, - int length - ) throws IOException;} + void nextVector(LongColumnVector previous, final int previousLen) + throws IOException; +} http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java index 24bd051..380f3391 100644 --- a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java +++ b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java @@ -20,7 +20,7 @@ package org.apache.orc.impl; import java.io.EOFException; import java.io.IOException; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; /** * A reader that reads a sequence of bytes. A control byte is read before @@ -92,16 +92,16 @@ public class RunLengthByteReader { return result; } - public void nextVector(ColumnVector previous, long[] data, long size) + public void nextVector(LongColumnVector previous, long previousLen) throws IOException { previous.isRepeating = true; - for (int i = 0; i < size; i++) { + for (int i = 0; i < previousLen; i++) { if (!previous.isNull[i]) { - data[i] = next(); + previous.vector[i] = next(); } else { // The default value of null for int types in vectorized // processing is 1, so set that if the value is null - data[i] = 1; + previous.vector[i] = 1; } // The default value for nulls in Vectorization for int types is 1 @@ -109,36 +109,12 @@ public class RunLengthByteReader { // when determining the isRepeating flag. if (previous.isRepeating && i > 0 - && ((data[0] != data[i]) || - (previous.isNull[0] != previous.isNull[i]))) { + && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) { previous.isRepeating = false; } } } - /** - * Read the next size bytes into the data array, skipping over any slots - * where isNull is true. - * @param isNull if non-null, skip any rows where isNull[r] is true - * @param data the array to read into - * @param size the number of elements to read - * @throws IOException - */ - public void nextVector(boolean[] isNull, int[] data, - long size) throws IOException { - if (isNull == null) { - for(int i=0; i < size; ++i) { - data[i] = next(); - } - } else { - for(int i=0; i < size; ++i) { - if (!isNull[i]) { - data[i] = next(); - } - } - } - } - public void seek(PositionProvider index) throws IOException { input.seek(index); int consumed = (int) index.getNext(); http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java index b91a263..0c90cde 100644 --- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java +++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java @@ -20,7 +20,7 @@ package org.apache.orc.impl; import java.io.EOFException; import java.io.IOException; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; /** * A reader that reads a sequence of integers. @@ -99,17 +99,15 @@ public class RunLengthIntegerReader implements IntegerReader { } @Override - public void nextVector(ColumnVector previous, - long[] data, - int previousLen) throws IOException { + public void nextVector(LongColumnVector previous, final int previousLen) throws IOException { previous.isRepeating = true; for (int i = 0; i < previousLen; i++) { if (!previous.isNull[i]) { - data[i] = next(); + previous.vector[i] = next(); } else { // The default value of null for int type in vectorized // processing is 1, so set that if the value is null - data[i] = 1; + previous.vector[i] = 1; } // The default value for nulls in Vectorization for int types is 1 @@ -117,32 +115,13 @@ public class RunLengthIntegerReader implements IntegerReader { // when determining the isRepeating flag. if (previous.isRepeating && i > 0 - && (data[0] != data[i] || previous.isNull[0] != previous.isNull[i])) { + && (previous.vector[i - 1] != previous.vector[i] || previous.isNull[i - 1] != previous.isNull[i])) { previous.isRepeating = false; } } } @Override - public void nextVector(ColumnVector vector, - int[] data, - int size) throws IOException { - if (vector.noNulls) { - for(int r=0; r < data.length && r < size; ++r) { - data[r] = (int) next(); - } - } else if (!(vector.isRepeating && vector.isNull[0])) { - for(int r=0; r < data.length && r < size; ++r) { - if (!vector.isNull[r]) { - data[r] = (int) next(); - } else { - data[r] = 1; - } - } - } - } - - @Override public void seek(PositionProvider index) throws IOException { input.seek(index); int consumed = (int) index.getNext(); http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java index 610d9b5..c6d685a 100644 --- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java +++ b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java @@ -21,9 +21,9 @@ import java.io.EOFException; import java.io.IOException; import java.util.Arrays; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; /** * A reader that reads a sequence of light weight compressed integers. Refer @@ -360,17 +360,15 @@ public class RunLengthIntegerReaderV2 implements IntegerReader { } @Override - public void nextVector(ColumnVector previous, - long[] data, - int previousLen) throws IOException { + public void nextVector(LongColumnVector previous, final int previousLen) throws IOException { previous.isRepeating = true; for (int i = 0; i < previousLen; i++) { if (!previous.isNull[i]) { - data[i] = next(); + previous.vector[i] = next(); } else { // The default value of null for int type in vectorized // processing is 1, so set that if the value is null - data[i] = 1; + previous.vector[i] = 1; } // The default value for nulls in Vectorization for int types is 1 @@ -378,29 +376,10 @@ public class RunLengthIntegerReaderV2 implements IntegerReader { // when determining the isRepeating flag. if (previous.isRepeating && i > 0 - && (data[0] != data[i] || - previous.isNull[0] != previous.isNull[i])) { + && (previous.vector[i - 1] != previous.vector[i] || + previous.isNull[i - 1] != previous.isNull[i])) { previous.isRepeating = false; } } } - - @Override - public void nextVector(ColumnVector vector, - int[] data, - int size) throws IOException { - if (vector.noNulls) { - for(int r=0; r < data.length && r < size; ++r) { - data[r] = (int) next(); - } - } else if (!(vector.isRepeating && vector.isNull[0])) { - for(int r=0; r < data.length && r < size; ++r) { - if (!vector.isNull[r]) { - data[r] = (int) next(); - } else { - data[r] = 1; - } - } - } - } } http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/orc/src/java/org/apache/orc/impl/WriterImpl.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/WriterImpl.java b/orc/src/java/org/apache/orc/impl/WriterImpl.java index b2966e0..f8afe06 100644 --- a/orc/src/java/org/apache/orc/impl/WriterImpl.java +++ b/orc/src/java/org/apache/orc/impl/WriterImpl.java @@ -1693,10 +1693,9 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } } - public static long MILLIS_PER_DAY = 24 * 60 * 60 * 1000; - public static long NANOS_PER_MILLI = 1000000; public static final int MILLIS_PER_SECOND = 1000; static final int NANOS_PER_SECOND = 1000000000; + static final int MILLIS_PER_NANO = 1000000; public static final String BASE_TIMESTAMP_STRING = "2015-01-01 00:00:00"; private static class TimestampTreeWriter extends TreeWriter { @@ -2262,36 +2261,32 @@ public class WriterImpl implements Writer, MemoryManager.Callback { } } else { // write the records in runs of the same tag - int[] currentStart = new int[vec.fields.length]; - int[] currentLength = new int[vec.fields.length]; + byte prevTag = 0; + int currentRun = 0; + boolean started = false; for(int i=0; i < length; ++i) { - // only need to deal with the non-nulls, since the nulls were dealt - // with in the super method. - if (vec.noNulls || !vec.isNull[i + offset]) { + if (!vec.isNull[i + offset]) { byte tag = (byte) vec.tags[offset + i]; tags.write(tag); - if (currentLength[tag] == 0) { - // start a new sequence - currentStart[tag] = i + offset; - currentLength[tag] = 1; - } else if (currentStart[tag] + currentLength[tag] == i + offset) { - // ok, we are extending the current run for that tag. - currentLength[tag] += 1; - } else { - // otherwise, we need to close off the old run and start a new one - childrenWriters[tag].writeBatch(vec.fields[tag], - currentStart[tag], currentLength[tag]); - currentStart[tag] = i + offset; - currentLength[tag] = 1; + if (!started) { + started = true; + currentRun = i; + prevTag = tag; + } else if (tag != prevTag) { + childrenWriters[prevTag].writeBatch(vec.fields[prevTag], + offset + currentRun, i - currentRun); + currentRun = i; + prevTag = tag; } + } else if (started) { + started = false; + childrenWriters[prevTag].writeBatch(vec.fields[prevTag], + offset + currentRun, i - currentRun); } } - // write out any left over sequences - for(int tag=0; tag < currentStart.length; ++tag) { - if (currentLength[tag] != 0) { - childrenWriters[tag].writeBatch(vec.fields[tag], currentStart[tag], - currentLength[tag]); - } + if (started) { + childrenWriters[prevTag].writeBatch(vec.fields[prevTag], + offset + currentRun, length - currentRun); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 82a97e0..0724191 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -215,9 +215,12 @@ public class VectorizedRowBatchCtx { LOG.info("createVectorizedRowBatch columnsToIncludeTruncated " + Arrays.toString(columnsToIncludeTruncated)); int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length; VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount); - for (int i = 0; i < dataColumnCount; i++) { - TypeInfo typeInfo = rowColumnTypeInfos[i]; - result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo); + + for (int i = 0; i < columnsToIncludeTruncated.length; i++) { + if (columnsToIncludeTruncated[i]) { + TypeInfo typeInfo = rowColumnTypeInfos[i]; + result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo); + } } for (int i = dataColumnCount; i < dataColumnCount + partitionColumnCount; i++) { @@ -473,8 +476,8 @@ public class VectorizedRowBatchCtx { bcv.isNull[0] = true; bcv.isRepeating = true; } else { - bcv.setVal(0, sVal.getBytes()); - bcv.isRepeating = true; + bcv.fill(sVal.getBytes()); + bcv.isNull[0] = false; } } break; http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index fcb8ca4..fe0be7b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -301,7 +301,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, /** * Do we have schema on read in the configuration variables? */ - TypeDescription schema = getDesiredRowTypeDescr(conf, false, Integer.MAX_VALUE); + TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcidRead */ false); Reader.Options options = new Reader.Options().range(offset, length); options.schema(schema); @@ -1743,7 +1743,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, /** * Do we have schema on read in the configuration variables? */ - TypeDescription schema = getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); + TypeDescription schema = getDesiredRowTypeDescr(conf, /* isAcidRead */ true); final Reader reader; final int bucket; @@ -1994,13 +1994,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, /** * Convert a Hive type property string that contains separated type names into a list of * TypeDescription objects. - * @param hiveTypeProperty the desired types from hive - * @param maxColumns the maximum number of desired columns * @return the list of TypeDescription objects. */ - public static ArrayList<TypeDescription> - typeDescriptionsFromHiveTypeProperty(String hiveTypeProperty, - int maxColumns) { + public static ArrayList<TypeDescription> typeDescriptionsFromHiveTypeProperty( + String hiveTypeProperty) { // CONSDIER: We need a type name parser for TypeDescription. @@ -2008,9 +2005,6 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, ArrayList<TypeDescription> typeDescrList =new ArrayList<TypeDescription>(typeInfoList.size()); for (TypeInfo typeInfo : typeInfoList) { typeDescrList.add(convertTypeInfo(typeInfo)); - if (typeDescrList.size() >= maxColumns) { - break; - } } return typeDescrList; } @@ -2097,18 +2091,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } } - /** - * Generate the desired schema for reading the file. - * @param conf the configuration - * @param isAcidRead is this an acid format? - * @param dataColumns the desired number of data columns for vectorized read - * @return the desired schema or null if schema evolution isn't enabled - * @throws IOException - */ - public static TypeDescription getDesiredRowTypeDescr(Configuration conf, - boolean isAcidRead, - int dataColumns - ) throws IOException { + public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcidRead) + throws IOException { String columnNameProperty = null; String columnTypeProperty = null; @@ -2131,10 +2115,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, haveSchemaEvolutionProperties = false; } else { schemaEvolutionTypeDescrs = - typeDescriptionsFromHiveTypeProperty(columnTypeProperty, - dataColumns); - if (schemaEvolutionTypeDescrs.size() != - Math.min(dataColumns, schemaEvolutionColumnNames.size())) { + typeDescriptionsFromHiveTypeProperty(columnTypeProperty); + if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) { haveSchemaEvolutionProperties = false; } } @@ -2165,9 +2147,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, return null; } schemaEvolutionTypeDescrs = - typeDescriptionsFromHiveTypeProperty(columnTypeProperty, dataColumns); - if (schemaEvolutionTypeDescrs.size() != - Math.min(dataColumns, schemaEvolutionColumnNames.size())) { + typeDescriptionsFromHiveTypeProperty(columnTypeProperty); + if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) { return null; } @@ -2181,7 +2162,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } columnNum++; } - if (virtualColumnClipNum != -1 && virtualColumnClipNum < dataColumns) { + if (virtualColumnClipNum != -1) { schemaEvolutionColumnNames = Lists.newArrayList(schemaEvolutionColumnNames.subList(0, virtualColumnClipNum)); schemaEvolutionTypeDescrs = Lists.newArrayList(schemaEvolutionTypeDescrs.subList(0, virtualColumnClipNum)); @@ -2198,7 +2179,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, // Desired schema does not include virtual columns or partition columns. TypeDescription result = TypeDescription.createStruct(); - for (int i = 0; i < schemaEvolutionTypeDescrs.size(); i++) { + for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) { result.addField(schemaEvolutionColumnNames.get(i), schemaEvolutionTypeDescrs.get(i)); } http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 0dd58b7..1fce282 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -447,8 +447,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ this.length = options.getLength(); this.validTxnList = validTxnList; - TypeDescription typeDescr = - OrcInputFormat.getDesiredRowTypeDescr(conf, true, Integer.MAX_VALUE); + TypeDescription typeDescr = OrcInputFormat.getDesiredRowTypeDescr(conf, /* isAcidRead */ true); objectInspector = OrcRecordUpdater.createEventSchema (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 0bcf9e3..a031a92 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -26,8 +26,6 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import org.apache.orc.OrcUtils; -import org.apache.orc.TypeDescription; import org.apache.orc.impl.BufferChunk; import org.apache.orc.ColumnStatistics; import org.apache.orc.impl.ColumnStatisticsImpl; @@ -73,7 +71,6 @@ public class ReaderImpl implements Reader { private final List<OrcProto.StripeStatistics> stripeStats; private final int metadataSize; protected final List<OrcProto.Type> types; - private final TypeDescription schema; private final List<OrcProto.UserMetadataItem> userMetadata; private final List<OrcProto.ColumnStatistics> fileStats; private final List<StripeInformation> stripes; @@ -246,11 +243,6 @@ public class ReaderImpl implements Reader { return result; } - @Override - public TypeDescription getSchema() { - return schema; - } - /** * Ensure this is an ORC file to prevent users from trying to read text * files or RC files as ORC files. @@ -394,9 +386,7 @@ public class ReaderImpl implements Reader { this.writerVersion = footerMetaData.writerVersion; this.stripes = convertProtoStripesToStripes(rInfo.footer.getStripesList()); } - this.schema = OrcUtils.convertTypeFromProtobuf(this.types, 0); } - /** * Get the WriterVersion based on the ORC file postscript. * @param writerVersion the integer writer version @@ -678,7 +668,7 @@ public class ReaderImpl implements Reader { options.include(include); } return new RecordReaderImpl(this.getStripes(), fileSystem, path, - options, schema, types, codec, bufferSize, rowIndexStride, conf); + options, types, codec, bufferSize, rowIndexStride, conf); } http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index c214658..3975409 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -27,8 +27,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.orc.BooleanColumnStatistics; -import org.apache.orc.TypeDescription; +import org.apache.orc.OrcUtils; import org.apache.orc.impl.BufferChunk; import org.apache.orc.ColumnStatistics; import org.apache.orc.impl.ColumnStatisticsImpl; @@ -57,6 +58,7 @@ import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.BloomFilterIO; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; @@ -96,6 +98,7 @@ public class RecordReaderImpl implements RecordReader { private final SargApplier sargApp; // an array about which row groups aren't skipped private boolean[] includedRowGroups = null; + private final Configuration conf; private final MetadataReader metadata; private final DataReader dataReader; @@ -142,33 +145,33 @@ public class RecordReaderImpl implements RecordReader { FileSystem fileSystem, Path path, Reader.Options options, - TypeDescription fileSchema, List<OrcProto.Type> types, CompressionCodec codec, int bufferSize, long strideRate, Configuration conf ) throws IOException { - SchemaEvolution treeReaderSchema; - this.included = options.getInclude(); - included[0] = true; + + TreeReaderFactory.TreeReaderSchema treeReaderSchema; if (options.getSchema() == null) { if (LOG.isInfoEnabled()) { LOG.info("Schema on read not provided -- using file schema " + types.toString()); } - treeReaderSchema = new SchemaEvolution(fileSchema, included); + treeReaderSchema = new TreeReaderFactory.TreeReaderSchema().fileTypes(types).schemaTypes(types); } else { // Now that we are creating a record reader for a file, validate that the schema to read // is compatible with the file schema. // - treeReaderSchema = new SchemaEvolution(fileSchema, options.getSchema(), - included); + List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema()); + treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes); } this.path = path; this.codec = codec; this.types = types; this.bufferSize = bufferSize; + this.included = options.getInclude(); + this.conf = conf; this.rowIndexStride = strideRate; this.metadata = new MetadataReaderImpl(fileSystem, path, codec, bufferSize, types.size()); SearchArgument sarg = options.getSearchArgument(); @@ -207,8 +210,7 @@ public class RecordReaderImpl implements RecordReader { skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf); } - reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(), - treeReaderSchema, included, skipCorrupt); + reader = TreeReaderFactory.createTreeReader(0, treeReaderSchema, included, skipCorrupt); indexes = new OrcProto.RowIndex[types.size()]; bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()]; advanceToNextRow(reader, 0L, true); @@ -237,7 +239,7 @@ public class RecordReaderImpl implements RecordReader { return metadata.readStripeFooter(stripe); } - enum Location { + static enum Location { BEFORE, MIN, MIDDLE, MAX, AFTER } @@ -1050,27 +1052,31 @@ public class RecordReaderImpl implements RecordReader { } @Override - public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOException { try { + final VectorizedRowBatch result; if (rowInStripe >= rowCountInStripe) { currentStripe += 1; - if (currentStripe >= stripes.size()) { - batch.size = 0; - return false; - } readStripe(); } - int batchSize = computeBatchSize(batch.getMaxSize()); + final int batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE); rowInStripe += batchSize; - reader.setVectorColumnCount(batch.getDataColumnCount()); - reader.nextBatch(batch, batchSize); + if (previous == null) { + ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize); + result = new VectorizedRowBatch(cols.length); + result.cols = cols; + } else { + result = previous; + result.selectedInUse = false; + reader.setVectorColumnCount(result.getDataColumnCount()); + reader.nextVector(result.cols, batchSize); + } - batch.size = (int) batchSize; - batch.selectedInUse = false; + result.size = batchSize; advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true); - return batch.size != 0; + return result; } catch (IOException e) { // Rethrow exception with file name in log message throw new IOException("Error reading file: " + path, e); http://git-wip-us.apache.org/repos/asf/hive/blob/d559b347/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java index 6747691..f28ca13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java @@ -20,12 +20,13 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema; +import org.apache.orc.OrcProto; +import org.apache.orc.OrcUtils; import org.apache.orc.TypeDescription; /** @@ -33,134 +34,103 @@ import org.apache.orc.TypeDescription; * has been schema evolution. */ public class SchemaEvolution { - private final Map<TypeDescription, TypeDescription> readerToFile; - private final boolean[] included; - private final TypeDescription readerSchema; + private static final Log LOG = LogFactory.getLog(SchemaEvolution.class); - public SchemaEvolution(TypeDescription readerSchema, boolean[] included) { - this.included = included; - readerToFile = null; - this.readerSchema = readerSchema; - } + public static TreeReaderSchema validateAndCreate(List<OrcProto.Type> fileTypes, + List<OrcProto.Type> schemaTypes) throws IOException { - public SchemaEvolution(TypeDescription fileSchema, - TypeDescription readerSchema, - boolean[] included) throws IOException { - readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1); - this.included = included; - if (checkAcidSchema(fileSchema)) { - this.readerSchema = createEventSchema(readerSchema); + // For ACID, the row is the ROW field in the outer STRUCT. + final boolean isAcid = checkAcidSchema(fileTypes); + final List<OrcProto.Type> rowSchema; + int rowSubtype; + if (isAcid) { + rowSubtype = OrcRecordUpdater.ROW + 1; + rowSchema = fileTypes.subList(rowSubtype, fileTypes.size()); } else { - this.readerSchema = readerSchema; + rowSubtype = 0; + rowSchema = fileTypes; } - buildMapping(fileSchema, this.readerSchema); - } - public TypeDescription getReaderSchema() { - return readerSchema; - } + // Do checking on the overlap. Additional columns will be defaulted to NULL. - public TypeDescription getFileType(TypeDescription readerType) { - TypeDescription result; - if (readerToFile == null) { - if (included == null || included[readerType.getId()]) { - result = readerType; - } else { - result = null; - } - } else { - result = readerToFile.get(readerType); - } - return result; - } + int numFileColumns = rowSchema.get(0).getSubtypesCount(); + int numDesiredColumns = schemaTypes.get(0).getSubtypesCount(); - void buildMapping(TypeDescription fileType, - TypeDescription readerType) throws IOException { - // if the column isn't included, don't map it - if (included != null && !included[readerType.getId()]) { - return; - } - boolean isOk = true; - // check the easy case first - if (fileType.getCategory() == readerType.getCategory()) { - switch (readerType.getCategory()) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - case DOUBLE: - case FLOAT: - case STRING: - case TIMESTAMP: - case BINARY: - case DATE: - // these are always a match - break; - case CHAR: - case VARCHAR: - isOk = fileType.getMaxLength() == readerType.getMaxLength(); - break; - case DECIMAL: - // TODO we don't enforce scale and precision checks, but probably should - break; - case UNION: - case MAP: - case LIST: { - // these must be an exact match - List<TypeDescription> fileChildren = fileType.getChildren(); - List<TypeDescription> readerChildren = readerType.getChildren(); - if (fileChildren.size() == readerChildren.size()) { - for(int i=0; i < fileChildren.size(); ++i) { - buildMapping(fileChildren.get(i), readerChildren.get(i)); - } - } else { - isOk = false; + int numReadColumns = Math.min(numFileColumns, numDesiredColumns); + + /** + * Check type promotion. + * + * Currently, we only support integer type promotions that can be done "implicitly". + * That is, we know that using a bigger integer tree reader on the original smaller integer + * column will "just work". + * + * In the future, other type promotions might require type conversion. + */ + // short -> int -> bigint as same integer readers are used for the above types. + + for (int i = 0; i < numReadColumns; i++) { + OrcProto.Type fColType = fileTypes.get(rowSubtype + i); + OrcProto.Type rColType = schemaTypes.get(i); + if (!fColType.getKind().equals(rColType.getKind())) { + + boolean ok = false; + if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) { + + if (rColType.getKind().equals(OrcProto.Type.Kind.INT) || + rColType.getKind().equals(OrcProto.Type.Kind.LONG)) { + // type promotion possible, converting SHORT to INT/LONG requested type + ok = true; } - break; - } - case STRUCT: { - // allow either side to have fewer fields than the other - List<TypeDescription> fileChildren = fileType.getChildren(); - List<TypeDescription> readerChildren = readerType.getChildren(); - int jointSize = Math.min(fileChildren.size(), readerChildren.size()); - for(int i=0; i < jointSize; ++i) { - buildMapping(fileChildren.get(i), readerChildren.get(i)); + } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) { + + if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) { + // type promotion possible, converting INT to LONG requested type + ok = true; } - break; } - default: - throw new IllegalArgumentException("Unknown type " + readerType); - } - } else { - switch (fileType.getCategory()) { - case SHORT: - if (readerType.getCategory() != TypeDescription.Category.INT && - readerType.getCategory() != TypeDescription.Category.LONG) { - isOk = false; - } - break; - case INT: - if (readerType.getCategory() != TypeDescription.Category.LONG) { - isOk = false; - } - break; - default: - isOk = false; + + if (!ok) { + throw new IOException("ORC does not support type conversion from " + + fColType.getKind().name() + " to " + rColType.getKind().name()); + } } } - if (isOk) { - readerToFile.put(readerType, fileType); + + List<OrcProto.Type> fullSchemaTypes; + + if (isAcid) { + fullSchemaTypes = new ArrayList<OrcProto.Type>(); + + // This copies the ACID struct type which is subtype = 0. + // It has field names "operation" through "row". + // And we copy the types for all fields EXCEPT ROW (which must be last!). + + for (int i = 0; i < rowSubtype; i++) { + fullSchemaTypes.add(fileTypes.get(i).toBuilder().build()); + } + + // Add the row struct type. + OrcUtils.appendOrcTypesRebuildSubtypes(fullSchemaTypes, schemaTypes, 0); } else { - throw new IOException("ORC does not support type conversion from " + - fileType + " to " + readerType); + fullSchemaTypes = schemaTypes; } + + int innerStructSubtype = rowSubtype; + + // LOG.info("Schema evolution: (fileTypes) " + fileTypes.toString() + + // " (schemaEvolutionTypes) " + schemaEvolutionTypes.toString()); + + return new TreeReaderSchema(). + fileTypes(fileTypes). + schemaTypes(fullSchemaTypes). + innerStructSubtype(innerStructSubtype); } - private static boolean checkAcidSchema(TypeDescription type) { - if (type.getCategory().equals(TypeDescription.Category.STRUCT)) { - List<String> rootFields = type.getFieldNames(); + private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) { + if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) { + List<String> rootFields = fileSchema.get(0).getFieldNamesList(); if (acidEventFieldNames.equals(rootFields)) { return true; } @@ -172,14 +142,26 @@ public class SchemaEvolution { * @param typeDescr * @return ORC types for the ACID event based on the row's type description */ - public static TypeDescription createEventSchema(TypeDescription typeDescr) { - TypeDescription result = TypeDescription.createStruct() - .addField("operation", TypeDescription.createInt()) - .addField("originalTransaction", TypeDescription.createLong()) - .addField("bucket", TypeDescription.createInt()) - .addField("rowId", TypeDescription.createLong()) - .addField("currentTransaction", TypeDescription.createLong()) - .addField("row", typeDescr.clone()); + public static List<OrcProto.Type> createEventSchema(TypeDescription typeDescr) { + + List<OrcProto.Type> result = new ArrayList<OrcProto.Type>(); + + OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); + type.setKind(OrcProto.Type.Kind.STRUCT); + type.addAllFieldNames(acidEventFieldNames); + for (int i = 0; i < acidEventFieldNames.size(); i++) { + type.addSubtypes(i + 1); + } + result.add(type.build()); + + // Automatically add all fields except the last (ROW). + for (int i = 0; i < acidEventOrcTypeKinds.size() - 1; i ++) { + type.clear(); + type.setKind(acidEventOrcTypeKinds.get(i)); + result.add(type.build()); + } + + OrcUtils.appendOrcTypesRebuildSubtypes(result, typeDescr); return result; } @@ -192,4 +174,14 @@ public class SchemaEvolution { acidEventFieldNames.add("currentTransaction"); acidEventFieldNames.add("row"); } + public static final List<OrcProto.Type.Kind> acidEventOrcTypeKinds = + new ArrayList<OrcProto.Type.Kind>(); + static { + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.STRUCT); + } }
