Repository: tajo Updated Branches: refs/heads/master 6debe7f8a -> 6717e3d1f
TAJO-2052: Upgrading ORC reader version. Closes #937 Signed-off-by: JaeHwa Jung <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/6717e3d1 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/6717e3d1 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/6717e3d1 Branch: refs/heads/master Commit: 6717e3d1f4ff234bbb99da3224978693355a6221 Parents: 6debe7f Author: Jongyoung Park <[email protected]> Authored: Mon Feb 1 10:49:14 2016 +0900 Committer: JaeHwa Jung <[email protected]> Committed: Mon Feb 1 10:49:14 2016 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../apache/tajo/storage/StorageConstants.java | 2 + .../src/main/sphinx/table_management/orc.rst | 1 + tajo-storage/tajo-storage-hdfs/pom.xml | 2 +- .../org/apache/tajo/storage/orc/ORCScanner.java | 190 +++++++++---------- .../TajoStructObjectInspector.java | 5 + .../thirdparty/orc/FileOrcDataSource.java | 132 ------------- .../thirdparty/orc/HdfsOrcDataSource.java | 24 +-- 8 files changed, 113 insertions(+), 245 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 11fabf1..bd19aae 100644 --- a/CHANGES +++ b/CHANGES @@ -8,6 +8,8 @@ Release 0.12.0 - unreleased IMPROVEMENT + TAJO-2052: Upgrading ORC reader version. (Jongyoung Park via jaehwa) + TAJO-1940: Implement HBaseTablespace::getTableVolume() method. (hyunsik) TAJO-2061: Add description for EXPLAIN statement. (jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java index d7f1ec5..097963c 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java @@ -82,6 +82,8 @@ public class StorageConstants { // ORC file properties ------------------------------------------------- public static final String ORC_MAX_MERGE_DISTANCE = "orc.max.merge.distance"; public static final String DEFAULT_ORC_MAX_MERGE_DISTANCE = "1048576"; // 1MB + public static final String ORC_MAX_READ_BUFFER_SIZE = "orc.max.read.buffer"; + public static final String DEFAULT_ORC_MAX_READ_BUFFER_SIZE = "8388608"; // 8MB public static final String ORC_STRIPE_SIZE = "orc.stripe.size"; public static final String DEFAULT_ORC_STRIPE_SIZE = "67108864"; // 64MB http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-docs/src/main/sphinx/table_management/orc.rst ---------------------------------------------------------------------- diff --git a/tajo-docs/src/main/sphinx/table_management/orc.rst b/tajo-docs/src/main/sphinx/table_management/orc.rst index 2733afc..eb84b20 100644 --- a/tajo-docs/src/main/sphinx/table_management/orc.rst +++ b/tajo-docs/src/main/sphinx/table_management/orc.rst @@ -34,6 +34,7 @@ The ``WITH`` clause in the CREATE TABLE statement allows users to set those para Now, ORC file provides the following physical properties. * ``orc.max.merge.distance``: When ORC file is read, if stripes are too closer and the distance is lower than this value, they are merged and read at once. Default is 1MB. +* ``orc.max.read.buffer``: When ORC file is read, it defines maximum read buffer size. That is, it can be maximum size of a single read. Default is 8MB. * ``orc.stripe.size``: It decides size of each stripe. Default is 64MB. * ``orc.compression.kind``: It means the compression algorithm used to compress and write data. It should be one of ``none``, ``snappy``, ``zlib``. Default is ``none``. * ``orc.buffer.size``: It decides size of writing buffer. Default is 256KB. http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-storage/tajo-storage-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml index 3b89e1c..6c10a88 100644 --- a/tajo-storage/tajo-storage-hdfs/pom.xml +++ b/tajo-storage/tajo-storage-hdfs/pom.xml @@ -358,7 +358,7 @@ <dependency> <groupId>com.facebook.presto</groupId> <artifactId>presto-orc</artifactId> - <version>0.86</version> + <version>0.132</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java index 9351c59..0a4ebc6 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java @@ -18,7 +18,16 @@ package org.apache.tajo.storage.orc; +import com.facebook.presto.orc.OrcDataSource; +import com.facebook.presto.orc.OrcPredicate; +import com.facebook.presto.orc.OrcReader; +import com.facebook.presto.orc.OrcRecordReader; +import com.facebook.presto.orc.memory.AggregatedMemoryContext; +import com.facebook.presto.orc.metadata.OrcMetadataReader; +import com.facebook.presto.spi.block.Block; +import com.facebook.presto.spi.type.*; import com.google.protobuf.InvalidProtocolBufferException; +import io.airlift.units.DataSize; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -39,15 +48,13 @@ import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; import org.apache.tajo.storage.fragment.Fragment; -import com.facebook.presto.orc.*; -import com.facebook.presto.orc.metadata.OrcMetadataReader; import org.apache.tajo.storage.thirdparty.orc.HdfsOrcDataSource; import org.apache.tajo.util.datetime.DateTimeUtil; import org.joda.time.DateTimeZone; import java.io.IOException; -import java.util.HashSet; -import java.util.Set; +import java.util.HashMap; +import java.util.Map; import java.util.TimeZone; /** @@ -56,42 +63,16 @@ import java.util.TimeZone; public class ORCScanner extends FileScanner { private static final Log LOG = LogFactory.getLog(ORCScanner.class); private OrcRecordReader recordReader; - private Vector [] vectors; + private Block[] blocks; private int currentPosInBatch = 0; private int batchSize = 0; private Tuple outTuple; + private AggregatedMemoryContext aggrMemoryContext = new AggregatedMemoryContext(); public ORCScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) { super(conf, schema, meta, fragment); } - private Vector createOrcVector(TajoDataTypes.DataType type) { - switch (type.getType()) { - case INT1: case INT2: case INT4: case INT8: - case INET4: - case TIMESTAMP: - case DATE: - return new LongVector(); - - case FLOAT4: - case FLOAT8: - return new DoubleVector(); - - case BOOLEAN: - case NULL_TYPE: - return new BooleanVector(); - - case BLOB: - case TEXT: - case CHAR: - case PROTOBUF: - return new SliceVector(); - - default: - throw new TajoRuntimeException(new NotImplementedException(type.getType().name() + " for orc")); - } - } - private FileSystem fs; private FSDataInputStream fis; @@ -108,6 +89,10 @@ public class ORCScanner extends FileScanner { @Override public void init() throws IOException { OrcReader orcReader; + DataSize maxMergeDistance = new DataSize(Double.parseDouble(meta.getProperty(StorageConstants.ORC_MAX_MERGE_DISTANCE, + StorageConstants.DEFAULT_ORC_MAX_MERGE_DISTANCE)), DataSize.Unit.BYTE); + DataSize maxReadSize = new DataSize(Double.parseDouble(meta.getProperty(StorageConstants.ORC_MAX_READ_BUFFER_SIZE, + StorageConstants.DEFAULT_ORC_MAX_READ_BUFFER_SIZE)), DataSize.Unit.BYTE); if (targets == null) { targets = schema.toArray(); @@ -129,8 +114,8 @@ public class ORCScanner extends FileScanner { this.fragment.getPath().toString(), fis, fs.getFileStatus(path).getLen(), - Integer.parseInt(meta.getProperty(StorageConstants.ORC_MAX_MERGE_DISTANCE, - StorageConstants.DEFAULT_ORC_MAX_MERGE_DISTANCE))); + maxMergeDistance, + maxReadSize); targetColInfo = new ColumnInfo[targets.length]; for (int i=0; i<targets.length; i++) { @@ -140,26 +125,23 @@ public class ORCScanner extends FileScanner { targetColInfo[i] = cinfo; } - // creating vectors for buffering - vectors = new Vector[targetColInfo.length]; - for (int i=0; i<targetColInfo.length; i++) { - vectors[i] = createOrcVector(targetColInfo[i].type); - } + // creating blocks for buffering + blocks = new Block[targetColInfo.length]; - Set<Integer> columnSet = new HashSet<>(); + Map<Integer, Type> columnMap = new HashMap<>(); for (ColumnInfo colInfo: targetColInfo) { - columnSet.add(colInfo.id); + columnMap.put(colInfo.id, createFBtypeByTajoType(colInfo.type)); } - orcReader = new OrcReader(orcDataSource, new OrcMetadataReader()); + orcReader = new OrcReader(orcDataSource, new OrcMetadataReader(), maxMergeDistance, maxReadSize); TimeZone timezone = TimeZone.getTimeZone(meta.getProperty(StorageConstants.TIMEZONE, TajoConstants.DEFAULT_SYSTEM_TIMEZONE)); // TODO: make OrcPredicate useful // presto-orc uses joda timezone, so it needs to be converted. - recordReader = orcReader.createRecordReader(columnSet, OrcPredicate.TRUE, - fragment.getStartKey(), fragment.getLength(), DateTimeZone.forTimeZone(timezone)); + recordReader = orcReader.createRecordReader(columnMap, OrcPredicate.TRUE, + fragment.getStartKey(), fragment.getLength(), DateTimeZone.forTimeZone(timezone), aggrMemoryContext); super.init(); LOG.debug("file fragment { path: " + fragment.getPath() + @@ -179,7 +161,7 @@ public class ORCScanner extends FileScanner { } for (int i=0; i<targetColInfo.length; i++) { - outTuple.put(i, createValueDatum(vectors[i], targetColInfo[i].type)); + outTuple.put(i, createValueDatum(blocks[i], targetColInfo[i].type)); } currentPosInBatch++; @@ -187,95 +169,101 @@ public class ORCScanner extends FileScanner { return outTuple; } + private Type createFBtypeByTajoType(TajoDataTypes.DataType type) { + switch(type.getType()) { + case BOOLEAN: + return BooleanType.BOOLEAN; + + case INT1: + case INT2: + case INT4: + case INT8: + case INET4: + case NULL_TYPE: // meaningless + return BigintType.BIGINT; + + case TIMESTAMP: + return TimestampType.TIMESTAMP; + + case DATE: + return DateType.DATE; + + case FLOAT4: + case FLOAT8: + return DoubleType.DOUBLE; + + case CHAR: + case TEXT: + return VarcharType.VARCHAR; + + case BLOB: + case PROTOBUF: + return VarbinaryType.VARBINARY; + + default: + throw new TajoRuntimeException(new NotImplementedException(type.getType().name() + " for orc")); + } + } + // TODO: support more types - private Datum createValueDatum(Vector vector, TajoDataTypes.DataType type) { + private Datum createValueDatum(Block block, TajoDataTypes.DataType type) { + if (block.isNull(currentPosInBatch)) + return NullDatum.get(); + + // NOTE: block.get***() methods are determined by the type size wich is in createFBtypeByTajoType() switch (type.getType()) { case INT1: - case INT2: - if (((LongVector) vector).isNull[currentPosInBatch]) - return NullDatum.get(); + return DatumFactory.createInt2((short)block.getLong(currentPosInBatch, 0)); - return DatumFactory.createInt2((short) ((LongVector) vector).vector[currentPosInBatch]); + case INT2: + return DatumFactory.createInt2((short)block.getLong(currentPosInBatch, 0)); case INT4: - if (((LongVector) vector).isNull[currentPosInBatch]) - return NullDatum.get(); - - return DatumFactory.createInt4((int) ((LongVector) vector).vector[currentPosInBatch]); + return DatumFactory.createInt4((int)block.getLong(currentPosInBatch, 0)); case INT8: - if (((LongVector) vector).isNull[currentPosInBatch]) - return NullDatum.get(); - - return DatumFactory.createInt8(((LongVector) vector).vector[currentPosInBatch]); + return DatumFactory.createInt8(block.getLong(currentPosInBatch, 0)); case FLOAT4: - if (((DoubleVector) vector).isNull[currentPosInBatch]) - return NullDatum.get(); - - return DatumFactory.createFloat4((float) ((DoubleVector) vector).vector[currentPosInBatch]); + return DatumFactory.createFloat4((float)block.getDouble(currentPosInBatch, 0)); case FLOAT8: - if (((DoubleVector) vector).isNull[currentPosInBatch]) - return NullDatum.get(); - - return DatumFactory.createFloat8(((DoubleVector) vector).vector[currentPosInBatch]); + return DatumFactory.createFloat8(block.getDouble(currentPosInBatch, 0)); case BOOLEAN: - if (((BooleanVector) vector).isNull[currentPosInBatch]) - return NullDatum.get(); - - return ((BooleanVector) vector).vector[currentPosInBatch] ? BooleanDatum.TRUE : BooleanDatum.FALSE; + return DatumFactory.createBool(block.getByte(currentPosInBatch, 0) != 0); case CHAR: - if (((SliceVector) vector).vector[currentPosInBatch] == null) - return NullDatum.get(); - - return DatumFactory.createChar(((SliceVector) vector).vector[currentPosInBatch].toStringUtf8()); + return DatumFactory.createChar(block.getSlice(currentPosInBatch, 0, + block.getLength(currentPosInBatch)).getBytes()); case TEXT: - if (((SliceVector) vector).vector[currentPosInBatch] == null) - return NullDatum.get(); - - return DatumFactory.createText(((SliceVector) vector).vector[currentPosInBatch].getBytes()); + return DatumFactory.createText(block.getSlice(currentPosInBatch, 0, + block.getLength(currentPosInBatch)).getBytes()); case BLOB: - if (((SliceVector) vector).vector[currentPosInBatch] == null) - return NullDatum.get(); - - return DatumFactory.createBlob(((SliceVector) vector).vector[currentPosInBatch].getBytes()); + return DatumFactory.createBlob(block.getSlice(currentPosInBatch, 0, + block.getLength(currentPosInBatch)).getBytes()); case PROTOBUF: try { - if (((SliceVector) vector).vector[currentPosInBatch] == null) - return NullDatum.get(); - - return ProtobufDatumFactory.createDatum(type, - ((SliceVector) vector).vector[currentPosInBatch].getBytes()); + return ProtobufDatumFactory.createDatum(type, block.getSlice(currentPosInBatch, 0, + block.getLength(currentPosInBatch)).getBytes()); } catch (InvalidProtocolBufferException e) { LOG.error("ERROR", e); return NullDatum.get(); } case TIMESTAMP: - if (((LongVector) vector).isNull[currentPosInBatch]) - return NullDatum.get(); - return DatumFactory.createTimestamp( - DateTimeUtil.javaTimeToJulianTime(((LongVector) vector).vector[currentPosInBatch])); + DateTimeUtil.javaTimeToJulianTime(block.getLong(currentPosInBatch, 0))); case DATE: - if (((LongVector) vector).isNull[currentPosInBatch]) - return NullDatum.get(); - return DatumFactory.createDate( - (int) ((LongVector) vector).vector[currentPosInBatch] + DateTimeUtil.DAYS_FROM_JULIAN_TO_EPOCH); + block.getInt(currentPosInBatch, 0) + DateTimeUtil.DAYS_FROM_JULIAN_TO_EPOCH); case INET4: - if (((LongVector) vector).isNull[currentPosInBatch]) - return NullDatum.get(); - - return DatumFactory.createInet4((int) ((LongVector) vector).vector[currentPosInBatch]); + return DatumFactory.createInet4((int)block.getLong(currentPosInBatch, 0)); case NULL_TYPE: return NullDatum.get(); @@ -286,7 +274,7 @@ public class ORCScanner extends FileScanner { } /** - * Fetch next batch from ORC file to vectors as many as batch size + * Fetch next batch from ORC file and write to block data structure as many as batch size * * @throws IOException */ @@ -298,7 +286,7 @@ public class ORCScanner extends FileScanner { return; for (int i=0; i<targetColInfo.length; i++) { - recordReader.readVector(targetColInfo[i].id, vectors[i]); + blocks[i] = recordReader.readBlock(createFBtypeByTajoType(targetColInfo[i].type), targetColInfo[i].id); } currentPosInBatch = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java index e85913e..7521fa3 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/TajoStructObjectInspector.java @@ -60,6 +60,11 @@ public class TajoStructObjectInspector extends StructObjectInspector { } @Override + public int getFieldID() { + return 0; + } + + @Override public String getFieldComment() { return comment; } http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java deleted file mode 100644 index de7c802..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -import com.facebook.presto.orc.DiskRange; -import com.facebook.presto.orc.OrcDataSource; -import com.google.common.collect.ImmutableMap; -import io.airlift.slice.Slice; -import io.airlift.units.DataSize; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Map.Entry; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice; -import static com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges; - -/** - * File data source class for Orc Reader - * - * Most of code is from Presto - */ -public class FileOrcDataSource - implements OrcDataSource -{ - private final File path; - private final long size; - private final RandomAccessFile input; - private final DataSize maxMergeDistance; - private long readTimeNanos; - - public FileOrcDataSource(File path, double mergeDistance) - throws IOException - { - this.path = checkNotNull(path, "path is null"); - this.size = path.length(); - this.input = new RandomAccessFile(path, "r"); - - maxMergeDistance = new DataSize(mergeDistance, DataSize.Unit.BYTE); - } - - @Override - public void close() - throws IOException - { - input.close(); - } - - @Override - public long getReadTimeNanos() - { - return readTimeNanos; - } - - @Override - public long getSize() - { - return size; - } - - @Override - public void readFully(long position, byte[] buffer) - throws IOException - { - readFully(position, buffer, 0, buffer.length); - } - - @Override - public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength) - throws IOException - { - long start = System.nanoTime(); - - input.seek(position); - input.readFully(buffer, bufferOffset, bufferLength); - - readTimeNanos += System.nanoTime() - start; - } - - @Override - public <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges) - throws IOException - { - checkNotNull(diskRanges, "diskRanges is null"); - - if (diskRanges.isEmpty()) { - return ImmutableMap.of(); - } - - // TODO: benchmark alternatively strategies: - // 1) sort ranges and perform one read per range - // 2) single read with transferTo() using custom WritableByteChannel - - Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance); - - // read ranges - Map<DiskRange, byte[]> buffers = new LinkedHashMap<>(); - for (DiskRange mergedRange : mergedRanges) { - // read full range in one request - byte[] buffer = new byte[mergedRange.getLength()]; - readFully(mergedRange.getOffset(), buffer); - buffers.put(mergedRange, buffer); - } - - ImmutableMap.Builder<K, Slice> slices = ImmutableMap.builder(); - for (Entry<K, DiskRange> entry : diskRanges.entrySet()) { - slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers)); - } - return slices.build(); - } - - @Override - public String toString() - { - return path.getPath(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/6717e3d1/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java index da12461..5357f51 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java @@ -17,7 +17,8 @@ package org.apache.tajo.storage.thirdparty.orc; import com.facebook.presto.orc.DiskRange; import com.facebook.presto.orc.OrcDataSource; import com.google.common.collect.ImmutableMap; -import io.airlift.slice.Slice; +import io.airlift.slice.BasicSliceInput; +import io.airlift.slice.FixedLengthSliceInput; import io.airlift.units.DataSize; import org.apache.hadoop.fs.FSDataInputStream; @@ -43,17 +44,19 @@ public class HdfsOrcDataSource private final String path; private final long size; private final DataSize maxMergeDistance; + private final DataSize maxReadSize; private long readTimeNanos; - public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long size, double maxMergeDistance) + public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long size, + DataSize maxMergeDistance, DataSize maxReadSize) { this.path = checkNotNull(path, "path is null"); this.inputStream = checkNotNull(inputStream, "inputStream is null"); this.size = size; checkArgument(size >= 0, "size is negative"); - DataSize mergeDistance = new DataSize(maxMergeDistance, DataSize.Unit.BYTE); - this.maxMergeDistance = checkNotNull(mergeDistance, "maxMergeDistance is null"); + this.maxMergeDistance = checkNotNull(maxMergeDistance, "maxMergeDistance is null"); + this.maxReadSize = checkNotNull(maxReadSize, "maxMergeDistance is null"); } @Override @@ -89,12 +92,11 @@ public class HdfsOrcDataSource long start = System.nanoTime(); inputStream.readFully(position, buffer, bufferOffset, bufferLength); - readTimeNanos += System.nanoTime() - start; } @Override - public <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges) + public <K> Map<K, FixedLengthSliceInput> readFully(Map<K, DiskRange> diskRanges) throws IOException { checkNotNull(diskRanges, "diskRanges is null"); @@ -103,7 +105,7 @@ public class HdfsOrcDataSource return ImmutableMap.of(); } - Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance); + Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance, maxReadSize); // read ranges Map<DiskRange, byte[]> buffers = new LinkedHashMap<>(); @@ -114,10 +116,10 @@ public class HdfsOrcDataSource buffers.put(mergedRange, buffer); } - ImmutableMap.Builder<K, Slice> slices = ImmutableMap.builder(); - for (Entry<K, DiskRange> entry : diskRanges.entrySet()) { - slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers)); - } + ImmutableMap.Builder<K, FixedLengthSliceInput> slices = ImmutableMap.builder(); + diskRanges.forEach((K key, DiskRange range) -> + slices.put(key, new BasicSliceInput(getDiskRangeSlice(range, buffers)))); + return slices.build(); }
