http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 9cfcc0e..2199b11 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -27,12 +27,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import com.google.common.base.Preconditions; -import com.google.common.io.Closer; +import org.apache.hadoop.fs.FileSystem; import org.apache.orc.BooleanColumnStatistics; -import org.apache.orc.DataReaderFactory; -import org.apache.orc.MetadataReaderFactory; -import org.apache.orc.OrcUtils; import org.apache.orc.impl.BufferChunk; import org.apache.orc.ColumnStatistics; import org.apache.orc.impl.ColumnStatisticsImpl; @@ -42,12 +38,9 @@ import org.apache.orc.DateColumnStatistics; import org.apache.orc.DecimalColumnStatistics; import org.apache.orc.DoubleColumnStatistics; import org.apache.orc.impl.DataReaderProperties; -import org.apache.orc.impl.DefaultMetadataReaderFactory; import org.apache.orc.impl.InStream; import org.apache.orc.IntegerColumnStatistics; -import org.apache.orc.impl.MetadataReader; import org.apache.orc.OrcConf; -import org.apache.orc.impl.MetadataReaderProperties; import org.apache.orc.impl.OrcIndex; import org.apache.orc.impl.PositionProvider; import org.apache.orc.impl.StreamName; @@ -56,14 +49,11 @@ import org.apache.orc.StripeInformation; import org.apache.orc.TimestampColumnStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper; import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.BloomFilterIO; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; @@ -103,8 +93,6 @@ public class RecordReaderImpl implements RecordReader { private final SargApplier sargApp; // an array about which row groups aren't skipped private boolean[] includedRowGroups = null; - private final Configuration conf; - private final MetadataReader metadata; private final DataReader dataReader; /** @@ -146,130 +134,36 @@ public class RecordReaderImpl implements RecordReader { return result; } - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - private Reader.Options options; - private CompressionCodec codec; - private List<OrcProto.Type> types; - private List<StripeInformation> stripes; - private int bufferSize; - private FileSystem fileSystem; - private Path path; - private Configuration conf; - private long strideRate; - private MetadataReaderFactory metadataReaderFactory = new DefaultMetadataReaderFactory(); - private DataReaderFactory dataReaderFactory = new DefaultDataReaderFactory(); - - private Builder() { - - } - - public Builder withOptions(Reader.Options options) { - this.options = options; - return this; - } - - public Builder withCodec(CompressionCodec codec) { - this.codec = codec; - return this; - } - - public Builder withTypes(List<OrcProto.Type> types) { - this.types = types; - return this; - } - - public Builder withStripes(List<StripeInformation> stripes) { - this.stripes = stripes; - return this; - } - - public Builder withBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - return this; - } - - public Builder withFileSystem(FileSystem fileSystem) { - this.fileSystem = fileSystem; - return this; - } - - public Builder withPath(Path path) { - this.path = path; - return this; - } - - public Builder withConf(Configuration conf) { - this.conf = conf; - return this; - } - - public Builder withStrideRate(long strideRate) { - this.strideRate = strideRate; - return this; - } - - public Builder withMetadataReaderFactory(MetadataReaderFactory metadataReaderFactory) { - this.metadataReaderFactory = metadataReaderFactory; - return this; - } - - public Builder withDataReaderFactory(DataReaderFactory dataReaderFactory) { - this.dataReaderFactory = dataReaderFactory; - return this; - } - - public RecordReaderImpl build() throws IOException { - Preconditions.checkNotNull(metadataReaderFactory); - Preconditions.checkNotNull(dataReaderFactory); - Preconditions.checkNotNull(options); - Preconditions.checkNotNull(types); - Preconditions.checkNotNull(stripes); - Preconditions.checkNotNull(fileSystem); - Preconditions.checkNotNull(path); - Preconditions.checkNotNull(conf); - - return new RecordReaderImpl(this); - } - } - - private RecordReaderImpl(Builder builder) throws IOException { - Reader.Options options = builder.options; - this.types = builder.types; - TreeReaderFactory.TreeReaderSchema treeReaderSchema; + protected RecordReaderImpl(ReaderImpl fileReader, + Reader.Options options) throws IOException { + SchemaEvolution treeReaderSchema; + this.included = options.getInclude(); + included[0] = true; if (options.getSchema() == null) { if (LOG.isInfoEnabled()) { - LOG.info("Schema on read not provided -- using file schema " + types.toString()); + LOG.info("Schema on read not provided -- using file schema " + + fileReader.getSchema()); } - treeReaderSchema = new TreeReaderFactory.TreeReaderSchema().fileTypes(types).schemaTypes(types); + treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), included); } else { // Now that we are creating a record reader for a file, validate that the schema to read // is compatible with the file schema. // - List<OrcProto.Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema()); - treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes); - } - this.path = builder.path; - this.codec = builder.codec; - this.bufferSize = builder.bufferSize; - this.included = options.getInclude(); - this.conf = builder.conf; - this.rowIndexStride = builder.strideRate; - this.metadata = builder.metadataReaderFactory.create(MetadataReaderProperties.builder() - .withFileSystem(builder.fileSystem) - .withPath(path) - .withCodec(codec) - .withBufferSize(bufferSize) - .withTypeCount(types.size()) - .build()); + treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), + options.getSchema(), + included); + } + this.path = fileReader.path; + this.codec = fileReader.codec; + this.types = fileReader.types; + this.bufferSize = fileReader.bufferSize; + this.rowIndexStride = fileReader.rowIndexStride; + FileSystem fileSystem = fileReader.fileSystem; SearchArgument sarg = options.getSearchArgument(); - if (sarg != null && builder.strideRate != 0) { + if (sarg != null && rowIndexStride != 0) { sargApp = new SargApplier( - sarg, options.getColumnNames(), builder.strideRate, types, included.length); + sarg, options.getColumnNames(), rowIndexStride, types, included.length); } else { sargApp = null; } @@ -277,7 +171,7 @@ public class RecordReaderImpl implements RecordReader { long skippedRows = 0; long offset = options.getOffset(); long maxOffset = options.getMaxOffset(); - for(StripeInformation stripe: builder.stripes) { + for(StripeInformation stripe: fileReader.getStripes()) { long stripeStart = stripe.getOffset(); if (offset > stripeStart) { skippedRows += stripe.getNumberOfRows(); @@ -289,25 +183,30 @@ public class RecordReaderImpl implements RecordReader { Boolean zeroCopy = options.getUseZeroCopy(); if (zeroCopy == null) { - zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(conf); + zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf); + } + if (options.getDataReader() == null) { + dataReader = RecordReaderUtils.createDefaultDataReader( + DataReaderProperties.builder() + .withBufferSize(bufferSize) + .withCompression(fileReader.compressionKind) + .withFileSystem(fileSystem) + .withPath(path) + .withTypeCount(types.size()) + .withZeroCopy(zeroCopy) + .build()); + } else { + dataReader = options.getDataReader(); } - // TODO: we could change the ctor to pass this externally - this.dataReader = builder.dataReaderFactory.create(DataReaderProperties.builder() - .withFileSystem(builder.fileSystem) - .withCodec(codec) - .withPath(path) - .withZeroCopy(zeroCopy) - .build()); - this.dataReader.open(); - firstRow = skippedRows; totalRowCount = rows; Boolean skipCorrupt = options.getSkipCorruptRecords(); if (skipCorrupt == null) { - skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf); + skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf); } - reader = TreeReaderFactory.createTreeReader(0, treeReaderSchema, included, skipCorrupt); + reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(), + treeReaderSchema, included, skipCorrupt); indexes = new OrcProto.RowIndex[types.size()]; bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()]; advanceToNextRow(reader, 0L, true); @@ -333,10 +232,10 @@ public class RecordReaderImpl implements RecordReader { } OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException { - return metadata.readStripeFooter(stripe); + return dataReader.readStripeFooter(stripe); } - static enum Location { + enum Location { BEFORE, MIN, MIDDLE, MAX, AFTER } @@ -895,7 +794,7 @@ public class RecordReaderImpl implements RecordReader { return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false); } - private void clearStreams() throws IOException { + private void clearStreams() { // explicit close of all streams to de-ref ByteBuffers for (InStream is : streams.values()) { is.close(); @@ -1149,31 +1048,27 @@ public class RecordReaderImpl implements RecordReader { } @Override - public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOException { + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { try { - final VectorizedRowBatch result; if (rowInStripe >= rowCountInStripe) { currentStripe += 1; + if (currentStripe >= stripes.size()) { + batch.size = 0; + return false; + } readStripe(); } - final int batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE); + int batchSize = computeBatchSize(batch.getMaxSize()); rowInStripe += batchSize; - if (previous == null) { - ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize); - result = new VectorizedRowBatch(cols.length); - result.cols = cols; - } else { - result = previous; - result.selectedInUse = false; - reader.setVectorColumnCount(result.getDataColumnCount()); - reader.nextVector(result.cols, batchSize); - } + reader.setVectorColumnCount(batch.getDataColumnCount()); + reader.nextBatch(batch, batchSize); - result.size = batchSize; + batch.size = (int) batchSize; + batch.selectedInUse = false; advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true); - return result; + return batch.size != 0; } catch (IOException e) { // Rethrow exception with file name in log message throw new IOException("Error reading file: " + path, e); @@ -1216,16 +1111,8 @@ public class RecordReaderImpl implements RecordReader { @Override public void close() throws IOException { - Closer closer = Closer.create(); - try { - closer.register(metadata); - closer.register(dataReader); - clearStreams(); - } catch (IOException e) { - throw closer.rethrow(e); - } finally { - closer.close(); - } + clearStreams(); + dataReader.close(); } @Override @@ -1244,10 +1131,6 @@ public class RecordReaderImpl implements RecordReader { return ((float) rowBaseInStripe + rowInStripe) / totalRowCount; } - MetadataReader getMetadataReader() { - return metadata; - } - private int findStripe(long rowNumber) { for (int i = 0; i < stripes.size(); i++) { StripeInformation stripe = stripes.get(i); @@ -1276,8 +1159,8 @@ public class RecordReaderImpl implements RecordReader { sargColumns = sargColumns == null ? (sargApp == null ? null : sargApp.sargColumns) : sargColumns; } - return metadata.readRowIndex(stripe, stripeFooter, included, indexes, sargColumns, - bloomFilterIndex); + return dataReader.readRowIndex(stripe, stripeFooter, included, indexes, + sargColumns, bloomFilterIndex); } private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry)
http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java index 177721d..4192588 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import com.google.common.collect.Lists; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -36,6 +37,7 @@ import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim; import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim; +import org.apache.orc.StripeInformation; import org.apache.orc.impl.BufferChunk; import org.apache.orc.CompressionCodec; import org.apache.orc.DataReader; @@ -44,6 +46,8 @@ import org.apache.orc.impl.DirectDecompressionCodec; import org.apache.orc.OrcProto; import com.google.common.collect.ComparisonChain; +import org.apache.orc.impl.InStream; +import org.apache.orc.impl.OrcIndex; import org.apache.orc.impl.OutStream; /** @@ -53,34 +57,130 @@ public class RecordReaderUtils { private static final HadoopShims SHIMS = ShimLoader.getHadoopShims(); private static class DefaultDataReader implements DataReader { - private FSDataInputStream file; - private ByteBufferAllocatorPool pool; - private ZeroCopyReaderShim zcr; - private FileSystem fs; - private Path path; - private boolean useZeroCopy; - private CompressionCodec codec; + private FSDataInputStream file = null; + private final ByteBufferAllocatorPool pool; + private ZeroCopyReaderShim zcr = null; + private final FileSystem fs; + private final Path path; + private final boolean useZeroCopy; + private final CompressionCodec codec; + private final int bufferSize; + private final int typeCount; + + private DefaultDataReader(DefaultDataReader other) { + this.pool = other.pool; + this.zcr = other.zcr; + this.bufferSize = other.bufferSize; + this.typeCount = other.typeCount; + this.fs = other.fs; + this.path = other.path; + this.useZeroCopy = other.useZeroCopy; + this.codec = other.codec; + } private DefaultDataReader(DataReaderProperties properties) { this.fs = properties.getFileSystem(); this.path = properties.getPath(); this.useZeroCopy = properties.getZeroCopy(); - this.codec = properties.getCodec(); + this.codec = WriterImpl.createCodec(properties.getCompression()); + this.bufferSize = properties.getBufferSize(); + this.typeCount = properties.getTypeCount(); + if (useZeroCopy) { + this.pool = new ByteBufferAllocatorPool(); + } else { + this.pool = null; + } } @Override public void open() throws IOException { this.file = fs.open(path); if (useZeroCopy) { - pool = new ByteBufferAllocatorPool(); zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool); } else { - pool = null; zcr = null; } } @Override + public OrcIndex readRowIndex(StripeInformation stripe, + OrcProto.StripeFooter footer, + boolean[] included, + OrcProto.RowIndex[] indexes, + boolean[] sargColumns, + OrcProto.BloomFilterIndex[] bloomFilterIndices + ) throws IOException { + if (file == null) { + open(); + } + if (footer == null) { + footer = readStripeFooter(stripe); + } + if (indexes == null) { + indexes = new OrcProto.RowIndex[typeCount]; + } + if (bloomFilterIndices == null) { + bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount]; + } + long offset = stripe.getOffset(); + List<OrcProto.Stream> streams = footer.getStreamsList(); + for (int i = 0; i < streams.size(); i++) { + OrcProto.Stream stream = streams.get(i); + OrcProto.Stream nextStream = null; + if (i < streams.size() - 1) { + nextStream = streams.get(i+1); + } + int col = stream.getColumn(); + int len = (int) stream.getLength(); + // row index stream and bloom filter are interlaced, check if the sarg column contains bloom + // filter and combine the io to read row index and bloom filters for that column together + if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) { + boolean readBloomFilter = false; + if (sargColumns != null && sargColumns[col] && + nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) { + len += nextStream.getLength(); + i += 1; + readBloomFilter = true; + } + if ((included == null || included[col]) && indexes[col] == null) { + byte[] buffer = new byte[len]; + file.readFully(offset, buffer, 0, buffer.length); + ByteBuffer bb = ByteBuffer.wrap(buffer); + indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index", + Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(), + codec, bufferSize)); + if (readBloomFilter) { + bb.position((int) stream.getLength()); + bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create( + "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), + nextStream.getLength(), codec, bufferSize)); + } + } + } + offset += len; + } + + OrcIndex index = new OrcIndex(indexes, bloomFilterIndices); + return index; + } + + @Override + public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException { + if (file == null) { + open(); + } + long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength(); + int tailLength = (int) stripe.getFooterLength(); + + // read the footer + ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); + file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength); + return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer", + Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)), + tailLength, codec, bufferSize)); + } + + @Override public DiskRangeList readFileData( DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException { return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, doForceDirect); @@ -106,9 +206,14 @@ public class RecordReaderUtils { zcr.releaseBuffer(buffer); } + @Override + public DataReader clone() { + return new DefaultDataReader(this); + } + } - static DataReader createDefaultDataReader(DataReaderProperties properties) { + public static DataReader createDefaultDataReader(DataReaderProperties properties) { return new DefaultDataReader(properties); } http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java index f28ca13..6747691 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java @@ -20,13 +20,12 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema; -import org.apache.orc.OrcProto; -import org.apache.orc.OrcUtils; import org.apache.orc.TypeDescription; /** @@ -34,103 +33,134 @@ import org.apache.orc.TypeDescription; * has been schema evolution. */ public class SchemaEvolution { - + private final Map<TypeDescription, TypeDescription> readerToFile; + private final boolean[] included; + private final TypeDescription readerSchema; private static final Log LOG = LogFactory.getLog(SchemaEvolution.class); - public static TreeReaderSchema validateAndCreate(List<OrcProto.Type> fileTypes, - List<OrcProto.Type> schemaTypes) throws IOException { + public SchemaEvolution(TypeDescription readerSchema, boolean[] included) { + this.included = included; + readerToFile = null; + this.readerSchema = readerSchema; + } - // For ACID, the row is the ROW field in the outer STRUCT. - final boolean isAcid = checkAcidSchema(fileTypes); - final List<OrcProto.Type> rowSchema; - int rowSubtype; - if (isAcid) { - rowSubtype = OrcRecordUpdater.ROW + 1; - rowSchema = fileTypes.subList(rowSubtype, fileTypes.size()); + public SchemaEvolution(TypeDescription fileSchema, + TypeDescription readerSchema, + boolean[] included) throws IOException { + readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1); + this.included = included; + if (checkAcidSchema(fileSchema)) { + this.readerSchema = createEventSchema(readerSchema); } else { - rowSubtype = 0; - rowSchema = fileTypes; + this.readerSchema = readerSchema; } + buildMapping(fileSchema, this.readerSchema); + } - // Do checking on the overlap. Additional columns will be defaulted to NULL. - - int numFileColumns = rowSchema.get(0).getSubtypesCount(); - int numDesiredColumns = schemaTypes.get(0).getSubtypesCount(); - - int numReadColumns = Math.min(numFileColumns, numDesiredColumns); - - /** - * Check type promotion. - * - * Currently, we only support integer type promotions that can be done "implicitly". - * That is, we know that using a bigger integer tree reader on the original smaller integer - * column will "just work". - * - * In the future, other type promotions might require type conversion. - */ - // short -> int -> bigint as same integer readers are used for the above types. - - for (int i = 0; i < numReadColumns; i++) { - OrcProto.Type fColType = fileTypes.get(rowSubtype + i); - OrcProto.Type rColType = schemaTypes.get(i); - if (!fColType.getKind().equals(rColType.getKind())) { - - boolean ok = false; - if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) { + public TypeDescription getReaderSchema() { + return readerSchema; + } - if (rColType.getKind().equals(OrcProto.Type.Kind.INT) || - rColType.getKind().equals(OrcProto.Type.Kind.LONG)) { - // type promotion possible, converting SHORT to INT/LONG requested type - ok = true; - } - } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) { + public TypeDescription getFileType(TypeDescription readerType) { + TypeDescription result; + if (readerToFile == null) { + if (included == null || included[readerType.getId()]) { + result = readerType; + } else { + result = null; + } + } else { + result = readerToFile.get(readerType); + } + return result; + } - if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) { - // type promotion possible, converting INT to LONG requested type - ok = true; + void buildMapping(TypeDescription fileType, + TypeDescription readerType) throws IOException { + // if the column isn't included, don't map it + if (included != null && !included[readerType.getId()]) { + return; + } + boolean isOk = true; + // check the easy case first + if (fileType.getCategory() == readerType.getCategory()) { + switch (readerType.getCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DOUBLE: + case FLOAT: + case STRING: + case TIMESTAMP: + case BINARY: + case DATE: + // these are always a match + break; + case CHAR: + case VARCHAR: + isOk = fileType.getMaxLength() == readerType.getMaxLength(); + break; + case DECIMAL: + // TODO we don't enforce scale and precision checks, but probably should + break; + case UNION: + case MAP: + case LIST: { + // these must be an exact match + List<TypeDescription> fileChildren = fileType.getChildren(); + List<TypeDescription> readerChildren = readerType.getChildren(); + if (fileChildren.size() == readerChildren.size()) { + for(int i=0; i < fileChildren.size(); ++i) { + buildMapping(fileChildren.get(i), readerChildren.get(i)); + } + } else { + isOk = false; } + break; } - - if (!ok) { - throw new IOException("ORC does not support type conversion from " + - fColType.getKind().name() + " to " + rColType.getKind().name()); + case STRUCT: { + // allow either side to have fewer fields than the other + List<TypeDescription> fileChildren = fileType.getChildren(); + List<TypeDescription> readerChildren = readerType.getChildren(); + int jointSize = Math.min(fileChildren.size(), readerChildren.size()); + for(int i=0; i < jointSize; ++i) { + buildMapping(fileChildren.get(i), readerChildren.get(i)); + } + break; } + default: + throw new IllegalArgumentException("Unknown type " + readerType); } - } - - List<OrcProto.Type> fullSchemaTypes; - - if (isAcid) { - fullSchemaTypes = new ArrayList<OrcProto.Type>(); - - // This copies the ACID struct type which is subtype = 0. - // It has field names "operation" through "row". - // And we copy the types for all fields EXCEPT ROW (which must be last!). - - for (int i = 0; i < rowSubtype; i++) { - fullSchemaTypes.add(fileTypes.get(i).toBuilder().build()); + } else { + switch (fileType.getCategory()) { + case SHORT: + if (readerType.getCategory() != TypeDescription.Category.INT && + readerType.getCategory() != TypeDescription.Category.LONG) { + isOk = false; + } + break; + case INT: + if (readerType.getCategory() != TypeDescription.Category.LONG) { + isOk = false; + } + break; + default: + isOk = false; } - - // Add the row struct type. - OrcUtils.appendOrcTypesRebuildSubtypes(fullSchemaTypes, schemaTypes, 0); + } + if (isOk) { + readerToFile.put(readerType, fileType); } else { - fullSchemaTypes = schemaTypes; + throw new IOException("ORC does not support type conversion from " + + fileType + " to " + readerType); } - - int innerStructSubtype = rowSubtype; - - // LOG.info("Schema evolution: (fileTypes) " + fileTypes.toString() + - // " (schemaEvolutionTypes) " + schemaEvolutionTypes.toString()); - - return new TreeReaderSchema(). - fileTypes(fileTypes). - schemaTypes(fullSchemaTypes). - innerStructSubtype(innerStructSubtype); } - private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) { - if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) { - List<String> rootFields = fileSchema.get(0).getFieldNamesList(); + private static boolean checkAcidSchema(TypeDescription type) { + if (type.getCategory().equals(TypeDescription.Category.STRUCT)) { + List<String> rootFields = type.getFieldNames(); if (acidEventFieldNames.equals(rootFields)) { return true; } @@ -142,26 +172,14 @@ public class SchemaEvolution { * @param typeDescr * @return ORC types for the ACID event based on the row's type description */ - public static List<OrcProto.Type> createEventSchema(TypeDescription typeDescr) { - - List<OrcProto.Type> result = new ArrayList<OrcProto.Type>(); - - OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); - type.setKind(OrcProto.Type.Kind.STRUCT); - type.addAllFieldNames(acidEventFieldNames); - for (int i = 0; i < acidEventFieldNames.size(); i++) { - type.addSubtypes(i + 1); - } - result.add(type.build()); - - // Automatically add all fields except the last (ROW). - for (int i = 0; i < acidEventOrcTypeKinds.size() - 1; i ++) { - type.clear(); - type.setKind(acidEventOrcTypeKinds.get(i)); - result.add(type.build()); - } - - OrcUtils.appendOrcTypesRebuildSubtypes(result, typeDescr); + public static TypeDescription createEventSchema(TypeDescription typeDescr) { + TypeDescription result = TypeDescription.createStruct() + .addField("operation", TypeDescription.createInt()) + .addField("originalTransaction", TypeDescription.createLong()) + .addField("bucket", TypeDescription.createInt()) + .addField("rowId", TypeDescription.createLong()) + .addField("currentTransaction", TypeDescription.createLong()) + .addField("row", typeDescr.clone()); return result; } @@ -174,14 +192,4 @@ public class SchemaEvolution { acidEventFieldNames.add("currentTransaction"); acidEventFieldNames.add("row"); } - public static final List<OrcProto.Type.Kind> acidEventOrcTypeKinds = - new ArrayList<OrcProto.Type.Kind>(); - static { - acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT); - acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG); - acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT); - acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG); - acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG); - acidEventOrcTypeKinds.add(OrcProto.Type.Kind.STRUCT); - } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java index 8bb32ea..8ee8cd7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java @@ -24,6 +24,7 @@ import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -35,9 +36,12 @@ import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils; +import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; import org.apache.hadoop.hive.serde2.io.ByteWritable; @@ -56,8 +60,7 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.orc.TypeDescription; import org.apache.orc.impl.BitFieldReader; import org.apache.orc.impl.DynamicByteArray; import org.apache.orc.impl.InStream; @@ -75,60 +78,6 @@ import org.apache.orc.impl.StreamName; */ public class TreeReaderFactory { - private static final Logger LOG = - LoggerFactory.getLogger(TreeReaderFactory.class); - - public static class TreeReaderSchema { - - /** - * The types in the ORC file. - */ - List<OrcProto.Type> fileTypes; - - /** - * The treeReaderSchema that the reader should read as. - */ - List<OrcProto.Type> schemaTypes; - - /** - * The subtype of the row STRUCT. Different than 0 for ACID. - */ - int innerStructSubtype; - - public TreeReaderSchema() { - fileTypes = null; - schemaTypes = null; - innerStructSubtype = -1; - } - - public TreeReaderSchema fileTypes(List<OrcProto.Type> fileTypes) { - this.fileTypes = fileTypes; - return this; - } - - public TreeReaderSchema schemaTypes(List<OrcProto.Type> schemaTypes) { - this.schemaTypes = schemaTypes; - return this; - } - - public TreeReaderSchema innerStructSubtype(int innerStructSubtype) { - this.innerStructSubtype = innerStructSubtype; - return this; - } - - public List<OrcProto.Type> getFileTypes() { - return fileTypes; - } - - public List<OrcProto.Type> getSchemaTypes() { - return schemaTypes; - } - - public int getInnerStructSubtype() { - return innerStructSubtype; - } - } - public abstract static class TreeReader { protected final int columnId; protected BitFieldReader present = null; @@ -230,36 +179,60 @@ public class TreeReaderFactory { } /** + * Called at the top level to read into the given batch. + * @param batch the batch to read into + * @param batchSize the number of rows to read + * @throws IOException + */ + public void nextBatch(VectorizedRowBatch batch, + int batchSize) throws IOException { + batch.cols[0].reset(); + batch.cols[0].ensureSize(batchSize, false); + nextVector(batch.cols[0], null, batchSize); + } + + /** * Populates the isNull vector array in the previousVector object based on * the present stream values. This function is called from all the child * readers, and they all set the values based on isNull field value. * - * @param previousVector The columnVector object whose isNull value is populated + * @param previous The columnVector object whose isNull value is populated + * @param isNull Whether the each value was null at a higher level. If + * isNull is null, all values are non-null. * @param batchSize Size of the column vector - * @return next column vector * @throws IOException */ - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - ColumnVector result = (ColumnVector) previousVector; - if (present != null) { + public void nextVector(ColumnVector previous, + boolean[] isNull, + int batchSize) throws IOException { + if (present != null || isNull != null) { // Set noNulls and isNull vector of the ColumnVector based on // present stream - result.noNulls = true; + previous.noNulls = true; + boolean allNull = true; for (int i = 0; i < batchSize; i++) { - result.isNull[i] = (present.next() != 1); - if (result.noNulls && result.isNull[i]) { - result.noNulls = false; + if (isNull == null || !isNull[i]) { + if (present != null && present.next() != 1) { + previous.noNulls = false; + previous.isNull[i] = true; + } else { + previous.isNull[i] = false; + allNull = false; + } + } else { + previous.noNulls = false; + previous.isNull[i] = true; } } + previous.isRepeating = !previous.noNulls && allNull; } else { - // There is not present stream, this means that all the values are + // There is no present stream, this means that all the values are // present. - result.noNulls = true; + previous.noNulls = true; for (int i = 0; i < batchSize; i++) { - result.isNull[i] = false; + previous.isNull[i] = false; } } - return previousVector; } public BitFieldReader getPresent() { @@ -267,6 +240,46 @@ public class TreeReaderFactory { } } + public static class NullTreeReader extends TreeReader { + + public NullTreeReader(int columnId) throws IOException { + super(columnId); + } + + @Override + public void startStripe(Map<StreamName, InStream> streams, + OrcProto.StripeFooter footer) { + // PASS + } + + @Override + void skipRows(long rows) { + // PASS + } + + @Override + public void seek(PositionProvider position) { + // PASS + } + + @Override + public void seek(PositionProvider[] position) { + // PASS + } + + @Override + Object next(Object previous) { + return null; + } + + @Override + public void nextVector(ColumnVector vector, boolean[] isNull, int size) { + vector.noNulls = false; + vector.isNull[0] = true; + vector.isRepeating = true; + } + } + public static class BooleanTreeReader extends TreeReader { protected BitFieldReader reader = null; @@ -322,20 +335,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final LongColumnVector result; - if (previousVector == null) { - result = new LongColumnVector(); - } else { - result = (LongColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + LongColumnVector result = (LongColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); // Read value entries based on isNull entries reader.nextVector(result, batchSize); - return result; } } @@ -387,20 +396,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final LongColumnVector result; - if (previousVector == null) { - result = new LongColumnVector(); - } else { - result = (LongColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final LongColumnVector result = (LongColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, batchSize); - return result; + reader.nextVector(result, result.vector, batchSize); } @Override @@ -473,20 +478,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final LongColumnVector result; - if (previousVector == null) { - result = new LongColumnVector(); - } else { - result = (LongColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final LongColumnVector result = (LongColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, batchSize); - return result; + reader.nextVector(result, result.vector, batchSize); } @Override @@ -559,20 +560,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final LongColumnVector result; - if (previousVector == null) { - result = new LongColumnVector(); - } else { - result = (LongColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final LongColumnVector result = (LongColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, batchSize); - return result; + reader.nextVector(result, result.vector, batchSize); } @Override @@ -646,20 +643,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final LongColumnVector result; - if (previousVector == null) { - result = new LongColumnVector(); - } else { - result = (LongColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final LongColumnVector result = (LongColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, batchSize); - return result; + reader.nextVector(result, result.vector, batchSize); } @Override @@ -719,16 +712,13 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final DoubleColumnVector result; - if (previousVector == null) { - result = new DoubleColumnVector(); - } else { - result = (DoubleColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final DoubleColumnVector result = (DoubleColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); final boolean hasNulls = !result.noNulls; boolean allNulls = hasNulls; @@ -768,7 +758,6 @@ public class TreeReaderFactory { } result.isRepeating = repeating; } - return result; } @Override @@ -832,16 +821,13 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final DoubleColumnVector result; - if (previousVector == null) { - result = new DoubleColumnVector(); - } else { - result = (DoubleColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final DoubleColumnVector result = (DoubleColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); final boolean hasNulls = !result.noNulls; boolean allNulls = hasNulls; @@ -881,8 +867,6 @@ public class TreeReaderFactory { } result.isRepeating = repeating; } - - return result; } @Override @@ -974,19 +958,15 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final BytesColumnVector result; - if (previousVector == null) { - result = new BytesColumnVector(); - } else { - result = (BytesColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final BytesColumnVector result = (BytesColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize); - return result; } @Override @@ -1011,7 +991,6 @@ public class TreeReaderFactory { private final TimeZone readerTimeZone; private TimeZone writerTimeZone; private boolean hasSameTZRules; - private TimestampWritable scratchTimestampWritable; TimestampTreeReader(int columnId, boolean skipCorrupt) throws IOException { this(columnId, null, null, null, null, skipCorrupt); @@ -1115,9 +1094,9 @@ public class TreeReaderFactory { int newNanos = parseNanos(nanos.next()); // fix the rounding when we divided by 1000. if (millis >= 0) { - millis += newNanos / 1000000; + millis += newNanos / WriterImpl.NANOS_PER_MILLI; } else { - millis -= newNanos / 1000000; + millis -= newNanos / WriterImpl.NANOS_PER_MILLI; } long offset = 0; // If reader and writer time zones have different rules, adjust the timezone difference @@ -1144,31 +1123,45 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final TimestampColumnVector result; - if (previousVector == null) { - result = new TimestampColumnVector(); - } else { - result = (TimestampColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + TimestampColumnVector result = (TimestampColumnVector) previousVector; + super.nextVector(previousVector, isNull, batchSize); - result.reset(); - if (scratchTimestampWritable == null) { - scratchTimestampWritable = new TimestampWritable(); - } - Object obj; for (int i = 0; i < batchSize; i++) { - obj = next(scratchTimestampWritable); - if (obj == null) { - result.noNulls = false; - result.isNull[i] = true; - } else { - TimestampWritable writable = (TimestampWritable) obj; - result.set(i, writable.getTimestamp()); + if (result.noNulls || !result.isNull[i]) { + long millis = data.next() + base_timestamp; + int newNanos = parseNanos(nanos.next()); + if (millis < 0 && newNanos != 0) { + millis -= 1; + } + millis *= WriterImpl.MILLIS_PER_SECOND; + long offset = 0; + // If reader and writer time zones have different rules, adjust the timezone difference + // between reader and writer taking day light savings into account. + if (!hasSameTZRules) { + offset = writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(millis); + } + long adjustedMillis = millis + offset; + // Sometimes the reader timezone might have changed after adding the adjustedMillis. + // To account for that change, check for any difference in reader timezone after + // adding adjustedMillis. If so use the new offset (offset at adjustedMillis point of time). + if (!hasSameTZRules && + (readerTimeZone.getOffset(millis) != readerTimeZone.getOffset(adjustedMillis))) { + long newOffset = + writerTimeZone.getOffset(millis) - readerTimeZone.getOffset(adjustedMillis); + adjustedMillis = millis + newOffset; + } + result.time[i] = adjustedMillis; + result.nanos[i] = newNanos; + if (result.isRepeating && i != 0 && + (result.time[0] != result.time[i] || + result.nanos[0] != result.nanos[i])) { + result.isRepeating = false; + } } } - - return result; } private static int parseNanos(long serialized) { @@ -1253,20 +1246,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final LongColumnVector result; - if (previousVector == null) { - result = new LongColumnVector(); - } else { - result = (LongColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final LongColumnVector result = (LongColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); // Read value entries based on isNull entries - reader.nextVector(result, batchSize); - return result; + reader.nextVector(result, result.vector, batchSize); } @Override @@ -1278,7 +1267,7 @@ public class TreeReaderFactory { public static class DecimalTreeReader extends TreeReader { protected InStream valueStream; protected IntegerReader scaleReader = null; - private LongColumnVector scratchScaleVector; + private int[] scratchScaleVector; private final int precision; private final int scale; @@ -1293,7 +1282,7 @@ public class TreeReaderFactory { super(columnId, present); this.precision = precision; this.scale = scale; - this.scratchScaleVector = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + this.scratchScaleVector = new int[VectorizedRowBatch.DEFAULT_SIZE]; this.valueStream = valueStream; if (scaleStream != null && encoding != null) { checkEncoding(encoding); @@ -1352,46 +1341,34 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final DecimalColumnVector result; - if (previousVector == null) { - result = new DecimalColumnVector(precision, scale); - } else { - result = (DecimalColumnVector) previousVector; - } - - // Save the reference for isNull in the scratch vector - boolean[] scratchIsNull = scratchScaleVector.isNull; + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final DecimalColumnVector result = (DecimalColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); + if (batchSize > scratchScaleVector.length) { + scratchScaleVector = new int[(int) batchSize]; + } + scaleReader.nextVector(result, scratchScaleVector, batchSize); // Read value entries based on isNull entries - if (result.isRepeating) { - if (!result.isNull[0]) { + if (result.noNulls) { + for (int r=0; r < batchSize; ++r) { BigInteger bInt = SerializationUtils.readBigInteger(valueStream); - short scaleInData = (short) scaleReader.next(); - HiveDecimal dec = HiveDecimal.create(bInt, scaleInData); - dec = HiveDecimal.enforcePrecisionScale(dec, precision, scale); - result.set(0, dec); + HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]); + result.set(r, dec); } - } else { - // result vector has isNull values set, use the same to read scale vector. - scratchScaleVector.isNull = result.isNull; - scaleReader.nextVector(scratchScaleVector, batchSize); - for (int i = 0; i < batchSize; i++) { - if (!result.isNull[i]) { + } else if (!result.isRepeating || !result.isNull[0]) { + for (int r=0; r < batchSize; ++r) { + if (!result.isNull[r]) { BigInteger bInt = SerializationUtils.readBigInteger(valueStream); - short scaleInData = (short) scratchScaleVector.vector[i]; - HiveDecimal dec = HiveDecimal.create(bInt, scaleInData); - dec = HiveDecimal.enforcePrecisionScale(dec, precision, scale); - result.set(i, dec); + HiveDecimal dec = HiveDecimal.create(bInt, scratchScaleVector[r]); + result.set(r, dec); } } } - // Switch back the null vector. - scratchScaleVector.isNull = scratchIsNull; - return result; } @Override @@ -1481,8 +1458,10 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - return reader.nextVector(previousVector, batchSize); + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + reader.nextVector(previousVector, isNull, batchSize); } @Override @@ -1501,7 +1480,7 @@ public class TreeReaderFactory { BytesColumnVector result, final int batchSize) throws IOException { // Read lengths scratchlcv.isNull = result.isNull; // Notice we are replacing the isNull vector here... - lengths.nextVector(scratchlcv, batchSize); + lengths.nextVector(scratchlcv, scratchlcv.vector, batchSize); int totalLength = 0; if (!scratchlcv.isRepeating) { for (int i = 0; i < batchSize; i++) { @@ -1532,31 +1511,35 @@ public class TreeReaderFactory { } // This method has the common code for reading in bytes into a BytesColumnVector. - public static void readOrcByteArrays(InStream stream, IntegerReader lengths, - LongColumnVector scratchlcv, - BytesColumnVector result, final int batchSize) throws IOException { - - byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv, result, batchSize); - - // Too expensive to figure out 'repeating' by comparisons. - result.isRepeating = false; - int offset = 0; - if (!scratchlcv.isRepeating) { - for (int i = 0; i < batchSize; i++) { - if (!scratchlcv.isNull[i]) { - result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]); - offset += scratchlcv.vector[i]; - } else { - result.setRef(i, allBytes, 0, 0); + public static void readOrcByteArrays(InStream stream, + IntegerReader lengths, + LongColumnVector scratchlcv, + BytesColumnVector result, + int batchSize) throws IOException { + if (result.noNulls || !(result.isRepeating && result.isNull[0])) { + byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv, + result, (int) batchSize); + + // Too expensive to figure out 'repeating' by comparisons. + result.isRepeating = false; + int offset = 0; + if (!scratchlcv.isRepeating) { + for (int i = 0; i < batchSize; i++) { + if (!scratchlcv.isNull[i]) { + result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]); + offset += scratchlcv.vector[i]; + } else { + result.setRef(i, allBytes, 0, 0); + } } - } - } else { - for (int i = 0; i < batchSize; i++) { - if (!scratchlcv.isNull[i]) { - result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]); - offset += scratchlcv.vector[0]; - } else { - result.setRef(i, allBytes, 0, 0); + } else { + for (int i = 0; i < batchSize; i++) { + if (!scratchlcv.isNull[i]) { + result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]); + offset += scratchlcv.vector[0]; + } else { + result.setRef(i, allBytes, 0, 0); + } } } } @@ -1641,19 +1624,16 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final BytesColumnVector result; - if (previousVector == null) { - result = new BytesColumnVector(); - } else { - result = (BytesColumnVector) previousVector; - } + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final BytesColumnVector result = (BytesColumnVector) previousVector; // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); - BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize); - return result; + BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, + result, batchSize); } @Override @@ -1816,18 +1796,15 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final BytesColumnVector result; + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + final BytesColumnVector result = (BytesColumnVector) previousVector; int offset; int length; - if (previousVector == null) { - result = new BytesColumnVector(); - } else { - result = (BytesColumnVector) previousVector; - } // Read present/isNull stream - super.nextVector(result, batchSize); + super.nextVector(result, isNull, batchSize); if (dictionaryBuffer != null) { @@ -1838,7 +1815,8 @@ public class TreeReaderFactory { // Read string offsets scratchlcv.isNull = result.isNull; - reader.nextVector(scratchlcv, batchSize); + scratchlcv.ensureSize((int) batchSize, false); + reader.nextVector(scratchlcv, scratchlcv.vector, batchSize); if (!scratchlcv.isRepeating) { // The vector has non-repeating strings. Iterate thru the batch @@ -1878,7 +1856,6 @@ public class TreeReaderFactory { } } } - return result; } int getDictionaryEntryLength(int entry, int offset) { @@ -1936,11 +1913,13 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { // Get the vector of strings from StringTreeReader, then make a 2nd pass to // adjust down the length (right trim and truncate) if necessary. - BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize); - + super.nextVector(previousVector, isNull, batchSize); + BytesColumnVector result = (BytesColumnVector) previousVector; int adjustedDownLen; if (result.isRepeating) { if (result.noNulls || !result.isNull[0]) { @@ -1973,7 +1952,6 @@ public class TreeReaderFactory { } } } - return result; } } @@ -2010,10 +1988,13 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { // Get the vector of strings from StringTreeReader, then make a 2nd pass to // adjust down the length (truncate) if necessary. - BytesColumnVector result = (BytesColumnVector) super.nextVector(previousVector, batchSize); + super.nextVector(previousVector, isNull, batchSize); + BytesColumnVector result = (BytesColumnVector) previousVector; int adjustedDownLen; if (result.isRepeating) { @@ -2045,62 +2026,26 @@ public class TreeReaderFactory { } } } - return result; } } protected static class StructTreeReader extends TreeReader { - private final int readColumnCount; - private final int resultColumnCount; protected final TreeReader[] fields; - private final String[] fieldNames; - protected StructTreeReader( - int columnId, - TreeReaderSchema treeReaderSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { + protected StructTreeReader(int columnId, + TypeDescription readerSchema, + SchemaEvolution treeReaderSchema, + boolean[] included, + boolean skipCorrupt) throws IOException { super(columnId); - OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId); - - OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId); + TypeDescription fileSchema = treeReaderSchema.getFileType(readerSchema); - readColumnCount = Math.min(fileStructType.getFieldNamesCount(), schemaStructType.getFieldNamesCount()); - - if (columnId == treeReaderSchema.getInnerStructSubtype()) { - // If there are more result columns than reader columns, we will default those additional - // columns to NULL. - resultColumnCount = schemaStructType.getFieldNamesCount(); - } else { - resultColumnCount = readColumnCount; - } - - this.fields = new TreeReader[readColumnCount]; - this.fieldNames = new String[readColumnCount]; - - if (included == null) { - for (int i = 0; i < readColumnCount; ++i) { - int subtype = schemaStructType.getSubtypes(i); - this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); - // Use the treeReaderSchema evolution name since file/reader types may not have the real column name. - this.fieldNames[i] = schemaStructType.getFieldNames(i); - } - } else { - for (int i = 0; i < readColumnCount; ++i) { - int subtype = schemaStructType.getSubtypes(i); - if (subtype >= included.length) { - throw new IOException("subtype " + subtype + " exceeds the included array size " + - included.length + " fileTypes " + treeReaderSchema.getFileTypes().toString() + - " schemaTypes " + treeReaderSchema.getSchemaTypes().toString() + - " innerStructSubtype " + treeReaderSchema.getInnerStructSubtype()); - } - if (included[subtype]) { - this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); - } - // Use the treeReaderSchema evolution name since file/reader types may not have the real column name. - this.fieldNames[i] = schemaStructType.getFieldNames(i); - } + List<TypeDescription> childrenTypes = readerSchema.getChildren(); + this.fields = new TreeReader[childrenTypes.size()]; + for (int i = 0; i < fields.length; ++i) { + TypeDescription subtype = childrenTypes.get(i); + this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); } } @@ -2120,65 +2065,52 @@ public class TreeReaderFactory { OrcStruct result = null; if (valuePresent) { if (previous == null) { - result = new OrcStruct(resultColumnCount); + result = new OrcStruct(fields.length); } else { result = (OrcStruct) previous; // If the input format was initialized with a file with a // different number of fields, the number of fields needs to // be updated to the correct number - if (result.getNumFields() != resultColumnCount) { - result.setNumFields(resultColumnCount); - } + result.setNumFields(fields.length); } - for (int i = 0; i < readColumnCount; ++i) { + for (int i = 0; i < fields.length; ++i) { if (fields[i] != null) { result.setFieldValue(i, fields[i].next(result.getFieldValue(i))); } } - if (resultColumnCount > readColumnCount) { - for (int i = readColumnCount; i < resultColumnCount; ++i) { - // Default new treeReaderSchema evolution fields to NULL. - result.setFieldValue(i, null); - } - } } return result; } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - final ColumnVector[] result; - if (previousVector == null) { - result = new ColumnVector[readColumnCount]; - } else { - result = (ColumnVector[]) previousVector; + public void nextBatch(VectorizedRowBatch batch, + int batchSize) throws IOException { + for(int i=0; i < fields.length && + (vectorColumnCount == -1 || i < vectorColumnCount); ++i) { + batch.cols[i].reset(); + batch.cols[i].ensureSize((int) batchSize, false); + fields[i].nextVector(batch.cols[i], null, batchSize); } + } - // Read all the members of struct as column vectors - for (int i = 0; i < readColumnCount; i++) { - if (fields[i] != null) { - if (result[i] == null) { - result[i] = (ColumnVector) fields[i].nextVector(null, batchSize); - } else { - fields[i].nextVector(result[i], batchSize); - } - } - } + @Override + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + super.nextVector(previousVector, isNull, batchSize); + StructColumnVector result = (StructColumnVector) previousVector; + if (result.noNulls || !(result.isRepeating && result.isNull[0])) { + result.isRepeating = false; - // Default additional treeReaderSchema evolution fields to NULL. - if (vectorColumnCount != -1 && vectorColumnCount > readColumnCount) { - for (int i = readColumnCount; i < vectorColumnCount; ++i) { - ColumnVector colVector = result[i]; - if (colVector != null) { - colVector.isRepeating = true; - colVector.noNulls = false; - colVector.isNull[0] = true; + // Read all the members of struct as column vectors + boolean[] mask = result.noNulls ? null : result.isNull; + for (int f = 0; f < fields.length; f++) { + if (fields[f] != null) { + fields[f].nextVector(result.fields[f], mask, batchSize); } } } - - return result; } @Override @@ -2208,19 +2140,18 @@ public class TreeReaderFactory { protected final TreeReader[] fields; protected RunLengthByteReader tags; - protected UnionTreeReader(int columnId, - TreeReaderSchema treeReaderSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { - super(columnId); - OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); - int fieldCount = type.getSubtypesCount(); + protected UnionTreeReader(int fileColumn, + TypeDescription readerSchema, + SchemaEvolution treeReaderSchema, + boolean[] included, + boolean skipCorrupt) throws IOException { + super(fileColumn); + List<TypeDescription> childrenTypes = readerSchema.getChildren(); + int fieldCount = childrenTypes.size(); this.fields = new TreeReader[fieldCount]; for (int i = 0; i < fieldCount; ++i) { - int subtype = type.getSubtypes(i); - if (included == null || included[subtype]) { - this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); - } + TypeDescription subtype = childrenTypes.get(i); + this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt); } } @@ -2252,9 +2183,25 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previousVector, final int batchSize) throws IOException { - throw new UnsupportedOperationException( - "NextVector is not supported operation for Union type"); + public void nextVector(ColumnVector previousVector, + boolean[] isNull, + int batchSize) throws IOException { + UnionColumnVector result = (UnionColumnVector) previousVector; + super.nextVector(result, isNull, batchSize); + if (result.noNulls || !(result.isRepeating && result.isNull[0])) { + result.isRepeating = false; + tags.nextVector(result.noNulls ? null : result.isNull, result.tags, + batchSize); + boolean[] ignore = new boolean[(int) batchSize]; + for (int f = 0; f < result.fields.length; ++f) { + // build the ignore list for this tag + for (int r = 0; r < batchSize; ++r) { + ignore[r] = (!result.noNulls && result.isNull[r]) || + result.tags[r] != f; + } + fields[f].nextVector(result.fields[f], ignore, batchSize); + } + } } @Override @@ -2288,13 +2235,15 @@ public class TreeReaderFactory { protected final TreeReader elementReader; protected IntegerReader lengths = null; - protected ListTreeReader(int columnId, - TreeReaderSchema treeReaderSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { - super(columnId); - OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); - elementReader = createTreeReader(type.getSubtypes(0), treeReaderSchema, included, skipCorrupt); + protected ListTreeReader(int fileColumn, + TypeDescription readerSchema, + SchemaEvolution treeReaderSchema, + boolean[] included, + boolean skipCorrupt) throws IOException { + super(fileColumn); + TypeDescription elementType = readerSchema.getChildren().get(0); + elementReader = createTreeReader(elementType, treeReaderSchema, included, + skipCorrupt); } @Override @@ -2335,9 +2284,27 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previous, final int batchSize) throws IOException { - throw new UnsupportedOperationException( - "NextVector is not supported operation for List type"); + public void nextVector(ColumnVector previous, + boolean[] isNull, + int batchSize) throws IOException { + ListColumnVector result = (ListColumnVector) previous; + super.nextVector(result, isNull, batchSize); + // if we have some none-null values, then read them + if (result.noNulls || !(result.isRepeating && result.isNull[0])) { + lengths.nextVector(result, result.lengths, batchSize); + // even with repeating lengths, the list doesn't repeat + result.isRepeating = false; + // build the offsets vector and figure out how many children to read + result.childCount = 0; + for (int r = 0; r < batchSize; ++r) { + if (result.noNulls || !result.isNull[r]) { + result.offsets[r] = result.childCount; + result.childCount += result.lengths[r]; + } + } + result.child.ensureSize(result.childCount, false); + elementReader.nextVector(result.child, null, result.childCount); + } } @Override @@ -2378,24 +2345,16 @@ public class TreeReaderFactory { protected final TreeReader valueReader; protected IntegerReader lengths = null; - protected MapTreeReader(int columnId, - TreeReaderSchema treeReaderSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { - super(columnId); - OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); - int keyColumn = type.getSubtypes(0); - int valueColumn = type.getSubtypes(1); - if (included == null || included[keyColumn]) { - keyReader = createTreeReader(keyColumn, treeReaderSchema, included, skipCorrupt); - } else { - keyReader = null; - } - if (included == null || included[valueColumn]) { - valueReader = createTreeReader(valueColumn, treeReaderSchema, included, skipCorrupt); - } else { - valueReader = null; - } + protected MapTreeReader(int fileColumn, + TypeDescription readerSchema, + SchemaEvolution treeReaderSchema, + boolean[] included, + boolean skipCorrupt) throws IOException { + super(fileColumn); + TypeDescription keyType = readerSchema.getChildren().get(0); + TypeDescription valueType = readerSchema.getChildren().get(1); + keyReader = createTreeReader(keyType, treeReaderSchema, included, skipCorrupt); + valueReader = createTreeReader(valueType, treeReaderSchema, included, skipCorrupt); } @Override @@ -2429,9 +2388,28 @@ public class TreeReaderFactory { } @Override - public Object nextVector(Object previous, final int batchSize) throws IOException { - throw new UnsupportedOperationException( - "NextVector is not supported operation for Map type"); + public void nextVector(ColumnVector previous, + boolean[] isNull, + int batchSize) throws IOException { + MapColumnVector result = (MapColumnVector) previous; + super.nextVector(result, isNull, batchSize); + if (result.noNulls || !(result.isRepeating && result.isNull[0])) { + lengths.nextVector(result, result.lengths, batchSize); + // even with repeating lengths, the map doesn't repeat + result.isRepeating = false; + // build the offsets vector and figure out how many children to read + result.childCount = 0; + for (int r = 0; r < batchSize; ++r) { + if (result.noNulls || !result.isNull[r]) { + result.offsets[r] = result.childCount; + result.childCount += result.lengths[r]; + } + } + result.keys.ensureSize(result.childCount, false); + result.values.ensureSize(result.childCount, false); + keyReader.nextVector(result.keys, null, result.childCount); + valueReader.nextVector(result.values, null, result.childCount); + } } @Override @@ -2471,61 +2449,61 @@ public class TreeReaderFactory { } } - public static TreeReader createTreeReader(int columnId, - TreeReaderSchema treeReaderSchema, - boolean[] included, - boolean skipCorrupt - ) throws IOException { - OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId); - switch (type.getKind()) { + public static TreeReader createTreeReader(TypeDescription readerType, + SchemaEvolution evolution, + boolean[] included, + boolean skipCorrupt + ) throws IOException { + TypeDescription fileType = evolution.getFileType(readerType); + if (fileType == null || + (included != null && !included[readerType.getId()])) { + return new NullTreeReader(0); + } + switch (readerType.getCategory()) { case BOOLEAN: - return new BooleanTreeReader(columnId); + return new BooleanTreeReader(fileType.getId()); case BYTE: - return new ByteTreeReader(columnId); + return new ByteTreeReader(fileType.getId()); case DOUBLE: - return new DoubleTreeReader(columnId); + return new DoubleTreeReader(fileType.getId()); case FLOAT: - return new FloatTreeReader(columnId); + return new FloatTreeReader(fileType.getId()); case SHORT: - return new ShortTreeReader(columnId); + return new ShortTreeReader(fileType.getId()); case INT: - return new IntTreeReader(columnId); + return new IntTreeReader(fileType.getId()); case LONG: - return new LongTreeReader(columnId, skipCorrupt); + return new LongTreeReader(fileType.getId(), skipCorrupt); case STRING: - return new StringTreeReader(columnId); + return new StringTreeReader(fileType.getId()); case CHAR: - if (!type.hasMaximumLength()) { - throw new IllegalArgumentException("ORC char type has no length specified"); - } - return new CharTreeReader(columnId, type.getMaximumLength()); + return new CharTreeReader(fileType.getId(), readerType.getMaxLength()); case VARCHAR: - if (!type.hasMaximumLength()) { - throw new IllegalArgumentException("ORC varchar type has no length specified"); - } - return new VarcharTreeReader(columnId, type.getMaximumLength()); + return new VarcharTreeReader(fileType.getId(), readerType.getMaxLength()); case BINARY: - return new BinaryTreeReader(columnId); + return new BinaryTreeReader(fileType.getId()); case TIMESTAMP: - return new TimestampTreeReader(columnId, skipCorrupt); + return new TimestampTreeReader(fileType.getId(), skipCorrupt); case DATE: - return new DateTreeReader(columnId); + return new DateTreeReader(fileType.getId()); case DECIMAL: - int precision = - type.hasPrecision() ? type.getPrecision() : HiveDecimal.SYSTEM_DEFAULT_PRECISION; - int scale = type.hasScale() ? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE; - return new DecimalTreeReader(columnId, precision, scale); + return new DecimalTreeReader(fileType.getId(), readerType.getPrecision(), + readerType.getScale()); case STRUCT: - return new StructTreeReader(columnId, treeReaderSchema, included, skipCorrupt); + return new StructTreeReader(fileType.getId(), readerType, + evolution, included, skipCorrupt); case LIST: - return new ListTreeReader(columnId, treeReaderSchema, included, skipCorrupt); + return new ListTreeReader(fileType.getId(), readerType, + evolution, included, skipCorrupt); case MAP: - return new MapTreeReader(columnId, treeReaderSchema, included, skipCorrupt); + return new MapTreeReader(fileType.getId(), readerType, evolution, + included, skipCorrupt); case UNION: - return new UnionTreeReader(columnId, treeReaderSchema, included, skipCorrupt); + return new UnionTreeReader(fileType.getId(), readerType, + evolution, included, skipCorrupt); default: throw new IllegalArgumentException("Unsupported type " + - type.getKind()); + readerType.getCategory()); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 816b52d..e4d2e6e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -71,14 +71,29 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect OrcInputFormat.raiseAcidTablesMustBeReadWithAcidReaderException(conf); } + rbCtx = Utilities.getVectorizedRowBatchCtx(conf); /** * Do we have schema on read in the configuration variables? */ - TypeDescription schema = OrcInputFormat.getDesiredRowTypeDescr(conf, /* isAcidRead */ false); - List<OrcProto.Type> types = file.getTypes(); - Reader.Options options = new Reader.Options(); - options.schema(schema); + int dataColumns = rbCtx.getDataColumnCount(); + TypeDescription schema = + OrcInputFormat.getDesiredRowTypeDescr(conf, false, dataColumns); + if (schema == null) { + schema = file.getSchema(); + // Even if the user isn't doing schema evolution, cut the schema + // to the desired size. + if (schema.getCategory() == TypeDescription.Category.STRUCT && + schema.getChildren().size() > dataColumns) { + schema = schema.clone(); + List<TypeDescription> children = schema.getChildren(); + for(int c = children.size() - 1; c >= dataColumns; --c) { + children.remove(c); + } + } + } + Reader.Options options = new Reader.Options().schema(schema); + this.offset = fileSplit.getStart(); this.length = fileSplit.getLength(); options.range(offset, length); @@ -87,8 +102,6 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect this.reader = file.rowsOptions(options); - rbCtx = Utilities.getVectorizedRowBatchCtx(conf); - columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(conf); int partitionColumnCount = rbCtx.getPartitionColumnCount(); @@ -103,9 +116,6 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect @Override public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { - if (!reader.hasNext()) { - return false; - } try { // Check and update partition cols if necessary. Ideally, this should be done // in CreateValue as the partition is constant per split. But since Hive uses @@ -118,7 +128,9 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect } addPartitionCols = false; } - reader.nextBatch(value); + if (!reader.nextBatch(value)) { + return false; + } } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/hive/blob/0ac424f0/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index 70fe803..8e52907 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -101,8 +101,6 @@ public class WriterImpl extends org.apache.orc.impl.WriterImpl implements Writer } } - private static final long NANOS_PER_MILLI = 1000000; - /** * Set the value for a given column value within a batch. * @param rowId the row to set
