Repository: tajo Updated Branches: refs/heads/master 3919896e1 -> fa063f0e8
TAJO-1464: Add ORCFileScanner to read ORCFile table. Closes #579, closes #476 Signed-off-by: Jihoon Son <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fa063f0e Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fa063f0e Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fa063f0e Branch: refs/heads/master Commit: fa063f0e84d4ce9cb7e690a50a6a269289052779 Parents: 3919896 Author: Jongyoung Park <[email protected]> Authored: Thu Jul 23 16:29:48 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Thu Jul 23 16:30:54 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../org/apache/tajo/catalog/CatalogUtil.java | 2 + .../src/main/proto/CatalogProtos.proto | 1 + .../org/apache/tajo/datum/TimestampDatum.java | 2 +- .../apache/tajo/storage/StorageConstants.java | 2 + .../apache/tajo/util/datetime/DateTimeUtil.java | 2 + .../src/main/resources/storage-default.xml | 11 +- .../src/test/resources/storage-default.xml | 11 +- tajo-storage/tajo-storage-hdfs/pom.xml | 5 + .../org/apache/tajo/storage/orc/ORCScanner.java | 324 +++++++++++++++++++ .../thirdparty/orc/FileOrcDataSource.java | 132 ++++++++ .../thirdparty/orc/HdfsOrcDataSource.java | 131 ++++++++ .../apache/tajo/storage/orc/TestORCScanner.java | 107 ++++++ .../src/test/resources/dataset/u_data_20.orc | Bin 0 -> 813 bytes 14 files changed, 730 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index f81db07..9aec974 100644 --- a/CHANGES +++ b/CHANGES @@ -399,6 +399,9 @@ Release 0.11.0 - unreleased SUB TASKS + TAJO-1464: Add ORCFileScanner to read ORCFile table. (Contributed by + Jongyoung Park, Committed by jihoon) + TAJO-1693: Rearrange metric names. (hyunsik) TAJO-1496: Remove legacy CSVFile. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index b7244b0..8205e9b 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -305,6 +305,8 @@ public class CatalogUtil { return StoreType.ROWFILE; } else if (typeStr.equalsIgnoreCase(StoreType.RCFILE.name())) { return StoreType.RCFILE; + } else if (typeStr.equalsIgnoreCase(StoreType.ORC.name())) { + return StoreType.ORC; } else if (typeStr.equalsIgnoreCase(StoreType.PARQUET.name())) { return StoreType.PARQUET; } else if (typeStr.equalsIgnoreCase(StoreType.SEQUENCEFILE.name())) { http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index cb8c403..f95df0a 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -32,6 +32,7 @@ enum StoreType { RCFILE = 3; ROWFILE = 4; HCFILE = 5; + ORC = 6; PARQUET = 7; SEQUENCEFILE = 8; AVRO = 9; http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java index 02425eb..a05f76c 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java @@ -36,7 +36,7 @@ public class TimestampDatum extends Datum { /** * - * @param timestamp UTC based + * @param timestamp UTC based Julian time microseconds */ public TimestampDatum(long timestamp) { super(TajoDataTypes.Type.TIMESTAMP); http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java index 7158596..6df6228 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java @@ -78,6 +78,8 @@ public class StorageConstants { public static final String DEFAULT_BINARY_SERDE = "org.apache.tajo.storage.BinarySerializerDeserializer"; public static final String DEFAULT_TEXT_SERDE = "org.apache.tajo.storage.TextSerializerDeserializer"; + public static final String ORC_MAX_MERGE_DISTANCE = "orc.max.merge.distance"; + public static final String DEFAULT_ORC_MAX_MERGE_DISTANCE = "1048576"; // 1MB // Parquet file properties ------------------------------------------------- public static final String PARQUET_DEFAULT_BLOCK_SIZE; http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java index 570873d..5a338d3 100644 --- a/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java @@ -40,6 +40,8 @@ public class DateTimeUtil { /** maximum possible number of fields in a date * string */ private static int MAXDATEFIELDS = 25; + public final static int DAYS_FROM_JULIAN_TO_EPOCH = 2440588; + public static boolean isJulianCalendar(int year, int month, int day) { return year <= 1752 && month <= 9 && day < 14; } http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml index 09261a9..dfdff85 100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@ -39,7 +39,7 @@ <!--- Registered Scanner Handler --> <property> <name>tajo.storage.scanner-handler</name> - <value>text,json,raw,rcfile,row,parquet,sequencefile,avro,hbase</value> + <value>text,json,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value> </property> <!--- Fragment Class Configurations --> @@ -68,6 +68,10 @@ <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> <property> + <name>tajo.storage.fragment.orc.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> <name>tajo.storage.fragment.sequencefile.class</name> <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> @@ -112,6 +116,11 @@ </property> <property> + <name>tajo.storage.scanner-handler.orc.class</name> + <value>org.apache.tajo.storage.orc.ORCScanner</value> + </property> + + <property> <name>tajo.storage.scanner-handler.sequencefile.class</name> <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> </property> http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml index ba7f4e8..f637da0 100644 --- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml @@ -38,7 +38,7 @@ <!--- Registered Scanner Handler --> <property> <name>tajo.storage.scanner-handler</name> - <value>text,json,raw,rcfile,row,parquet,sequencefile,avro,hbase</value> + <value>text,json,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value> </property> <!--- Fragment Class Configurations --> @@ -67,6 +67,10 @@ <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> <property> + <name>tajo.storage.fragment.orc.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> <name>tajo.storage.fragment.sequencefile.class</name> <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> @@ -111,6 +115,11 @@ </property> <property> + <name>tajo.storage.scanner-handler.orc.class</name> + <value>org.apache.tajo.storage.orc.OrcScanner</value> + </property> + + <property> <name>tajo.storage.scanner-handler.sequencefile.class</name> <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> </property> http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml index 9b98b0d..bfa5707 100644 --- a/tajo-storage/tajo-storage-hdfs/pom.xml +++ b/tajo-storage/tajo-storage-hdfs/pom.xml @@ -352,6 +352,11 @@ <groupId>io.netty</groupId> <artifactId>netty-buffer</artifactId> </dependency> + <dependency> + <groupId>com.facebook.presto</groupId> + <artifactId>presto-orc</artifactId> + <version>0.86</version> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java new file mode 100644 index 0000000..9511071 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.orc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.*; +import org.apache.tajo.exception.UnimplementedException; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.storage.FileScanner; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.fragment.Fragment; +import com.facebook.presto.orc.*; +import com.facebook.presto.orc.metadata.OrcMetadataReader; +import org.apache.tajo.storage.thirdparty.orc.HdfsOrcDataSource; +import org.apache.tajo.util.datetime.DateTimeUtil; +import org.joda.time.DateTimeZone; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +/** + * OrcScanner for reading ORC files + */ +public class ORCScanner extends FileScanner { + private static final Log LOG = LogFactory.getLog(ORCScanner.class); + private OrcRecordReader recordReader; + private Vector [] vectors; + private int currentPosInBatch = 0; + private int batchSize = 0; + private Tuple outTuple; + + public ORCScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) { + super(conf, schema, meta, fragment); + } + + private Vector createOrcVector(TajoDataTypes.DataType type) { + switch (type.getType()) { + case INT1: case INT2: case INT4: case INT8: + case INET4: + case TIMESTAMP: + case DATE: + return new LongVector(); + + case FLOAT4: + case FLOAT8: + return new DoubleVector(); + + case BOOLEAN: + case NULL_TYPE: + return new BooleanVector(); + + case BLOB: + case TEXT: + case CHAR: + case PROTOBUF: + return new SliceVector(); + + default: + LOG.error("Not supported type for "+type.toString()); + throw new UnimplementedException("ORC type: "+type.toString()); + } + } + + private FileSystem fs; + private FSDataInputStream fis; + + private static class ColumnInfo { + TajoDataTypes.DataType type; + int id; + } + + /** + * Temporary array for caching column info + */ + private ColumnInfo [] targetColInfo; + + @Override + public void init() throws IOException { + OrcReader orcReader; + + if (targets == null) { + targets = schema.toArray(); + } + + super.init(); + + outTuple = new VTuple(targets.length); + + Path path = fragment.getPath(); + + if(fs == null) { + fs = FileScanner.getFileSystem((TajoConf)conf, path); + } + + if(fis == null) { + fis = fs.open(path); + } + + OrcDataSource orcDataSource = new HdfsOrcDataSource( + this.fragment.getPath().toString(), + fis, + fs.getFileStatus(path).getLen(), + Integer.parseInt(meta.getOption(StorageConstants.ORC_MAX_MERGE_DISTANCE, + StorageConstants.DEFAULT_ORC_MAX_MERGE_DISTANCE))); + + targetColInfo = new ColumnInfo[targets.length]; + for (int i=0; i<targets.length; i++) { + ColumnInfo cinfo = new ColumnInfo(); + cinfo.type = targets[i].getDataType(); + cinfo.id = schema.getColumnId(targets[i].getQualifiedName()); + targetColInfo[i] = cinfo; + } + + // creating vectors for buffering + vectors = new Vector[targetColInfo.length]; + for (int i=0; i<targetColInfo.length; i++) { + vectors[i] = createOrcVector(targetColInfo[i].type); + } + + Set<Integer> columnSet = new HashSet<Integer>(); + for (ColumnInfo colInfo: targetColInfo) { + columnSet.add(colInfo.id); + } + + orcReader = new OrcReader(orcDataSource, new OrcMetadataReader()); + + // TODO: make OrcPredicate useful + // TODO: TimeZone should be from conf + // TODO: it might be splittable + recordReader = orcReader.createRecordReader(columnSet, OrcPredicate.TRUE, + fragment.getStartKey(), fragment.getLength(), DateTimeZone.getDefault()); + + LOG.debug("file fragment { path: " + fragment.getPath() + + ", start offset: " + fragment.getStartKey() + + ", length: " + fragment.getLength() + "}"); + + getNextBatch(); + } + + @Override + public Tuple next() throws IOException { + if (currentPosInBatch == batchSize) { + getNextBatch(); + + // EOF + if (batchSize == -1) { + return null; + } + } + + for (int i=0; i<targetColInfo.length; i++) { + outTuple.put(i, createValueDatum(vectors[i], targetColInfo[i].type)); + } + + currentPosInBatch++; + + return outTuple; + } + + // TODO: support more types + private Datum createValueDatum(Vector vector, TajoDataTypes.DataType type) { + switch (type.getType()) { + case INT1: + case INT2: + if (((LongVector) vector).isNull[currentPosInBatch]) + return NullDatum.get(); + + return DatumFactory.createInt2((short) ((LongVector) vector).vector[currentPosInBatch]); + + case INT4: + if (((LongVector) vector).isNull[currentPosInBatch]) + return NullDatum.get(); + + return DatumFactory.createInt4((int) ((LongVector) vector).vector[currentPosInBatch]); + + case INT8: + if (((LongVector) vector).isNull[currentPosInBatch]) + return NullDatum.get(); + + return DatumFactory.createInt8(((LongVector) vector).vector[currentPosInBatch]); + + case FLOAT4: + if (((DoubleVector) vector).isNull[currentPosInBatch]) + return NullDatum.get(); + + return DatumFactory.createFloat4((float) ((DoubleVector) vector).vector[currentPosInBatch]); + + case FLOAT8: + if (((DoubleVector) vector).isNull[currentPosInBatch]) + return NullDatum.get(); + + return DatumFactory.createFloat8(((DoubleVector) vector).vector[currentPosInBatch]); + + case BOOLEAN: + if (((BooleanVector) vector).isNull[currentPosInBatch]) + return NullDatum.get(); + + return ((BooleanVector) vector).vector[currentPosInBatch] ? BooleanDatum.TRUE : BooleanDatum.FALSE; + + case CHAR: + if (((SliceVector) vector).vector[currentPosInBatch] == null) + return NullDatum.get(); + + return DatumFactory.createChar(((SliceVector) vector).vector[currentPosInBatch].toStringUtf8()); + + case TEXT: + if (((SliceVector) vector).vector[currentPosInBatch] == null) + return NullDatum.get(); + + return DatumFactory.createText(((SliceVector) vector).vector[currentPosInBatch].getBytes()); + + case BLOB: + if (((SliceVector) vector).vector[currentPosInBatch] == null) + return NullDatum.get(); + + return DatumFactory.createBlob(((SliceVector) vector).vector[currentPosInBatch].getBytes()); + + case TIMESTAMP: + if (((LongVector) vector).isNull[currentPosInBatch]) + return NullDatum.get(); + + return DatumFactory.createTimestamp( + DateTimeUtil.javaTimeToJulianTime(((LongVector) vector).vector[currentPosInBatch])); + + case DATE: + if (((LongVector) vector).isNull[currentPosInBatch]) + return NullDatum.get(); + + return DatumFactory.createDate( + (int) ((LongVector) vector).vector[currentPosInBatch] + DateTimeUtil.DAYS_FROM_JULIAN_TO_EPOCH); + + case INET4: + if (((LongVector) vector).isNull[currentPosInBatch]) + return NullDatum.get(); + + return DatumFactory.createInet4((int) ((LongVector) vector).vector[currentPosInBatch]); + + case NULL_TYPE: + return NullDatum.get(); + + default: + throw new UnimplementedException("ORC type: "+type.toString()); + } + } + + /** + * Fetch next batch from ORC file to vectors as many as batch size + * + * @throws IOException + */ + private void getNextBatch() throws IOException { + batchSize = recordReader.nextBatch(); + + for (int i=0; i<targetColInfo.length; i++) { + recordReader.readVector(targetColInfo[i].id, vectors[i]); + } + + currentPosInBatch = 0; + } + + @Override + public float getProgress() { + return recordReader.getProgress(); + } + + @Override + public void reset() throws IOException { + } + + @Override + public void close() throws IOException { + if (recordReader != null) { + recordReader.close(); + } + } + + @Override + public boolean isProjectable() { + return true; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setFilter(EvalNode filter) { + // TODO: implement it + } + + @Override + public boolean isSplittable() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java new file mode 100644 index 0000000..dcc1347 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java @@ -0,0 +1,132 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +import com.facebook.presto.orc.DiskRange; +import com.facebook.presto.orc.OrcDataSource; +import com.google.common.collect.ImmutableMap; +import io.airlift.slice.Slice; +import io.airlift.units.DataSize; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice; +import static com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges; + +/** + * File data source class for Orc Reader + * + * Most of code is from Presto + */ +public class FileOrcDataSource + implements OrcDataSource +{ + private final File path; + private final long size; + private final RandomAccessFile input; + private final DataSize maxMergeDistance; + private long readTimeNanos; + + public FileOrcDataSource(File path, double mergeDistance) + throws IOException + { + this.path = checkNotNull(path, "path is null"); + this.size = path.length(); + this.input = new RandomAccessFile(path, "r"); + + maxMergeDistance = new DataSize(mergeDistance, DataSize.Unit.BYTE); + } + + @Override + public void close() + throws IOException + { + input.close(); + } + + @Override + public long getReadTimeNanos() + { + return readTimeNanos; + } + + @Override + public long getSize() + { + return size; + } + + @Override + public void readFully(long position, byte[] buffer) + throws IOException + { + readFully(position, buffer, 0, buffer.length); + } + + @Override + public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength) + throws IOException + { + long start = System.nanoTime(); + + input.seek(position); + input.readFully(buffer, bufferOffset, bufferLength); + + readTimeNanos += System.nanoTime() - start; + } + + @Override + public <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges) + throws IOException + { + checkNotNull(diskRanges, "diskRanges is null"); + + if (diskRanges.isEmpty()) { + return ImmutableMap.of(); + } + + // TODO: benchmark alternatively strategies: + // 1) sort ranges and perform one read per range + // 2) single read with transferTo() using custom WritableByteChannel + + Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance); + + // read ranges + Map<DiskRange, byte[]> buffers = new LinkedHashMap<DiskRange, byte[]>(); + for (DiskRange mergedRange : mergedRanges) { + // read full range in one request + byte[] buffer = new byte[mergedRange.getLength()]; + readFully(mergedRange.getOffset(), buffer); + buffers.put(mergedRange, buffer); + } + + ImmutableMap.Builder<K, Slice> slices = ImmutableMap.builder(); + for (Entry<K, DiskRange> entry : diskRanges.entrySet()) { + slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers)); + } + return slices.build(); + } + + @Override + public String toString() + { + return path.getPath(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java new file mode 100644 index 0000000..73ea475 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java @@ -0,0 +1,131 @@ + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +import com.facebook.presto.orc.DiskRange; +import com.facebook.presto.orc.OrcDataSource; +import com.google.common.collect.ImmutableMap; +import io.airlift.slice.Slice; +import io.airlift.units.DataSize; +import org.apache.hadoop.fs.FSDataInputStream; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; + +import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice; +import static com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * HDFS File data source class for Orc Reader + * + * Most of code is from Presto + */ +public class HdfsOrcDataSource + implements OrcDataSource +{ + private final FSDataInputStream inputStream; + private final String path; + private final long size; + private final DataSize maxMergeDistance; + private long readTimeNanos; + + public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long size, double maxMergeDistance) + { + this.path = checkNotNull(path, "path is null"); + this.inputStream = checkNotNull(inputStream, "inputStream is null"); + this.size = size; + checkArgument(size >= 0, "size is negative"); + + DataSize mergeDistance = new DataSize(maxMergeDistance, DataSize.Unit.BYTE); + this.maxMergeDistance = checkNotNull(mergeDistance, "maxMergeDistance is null"); + } + + @Override + public void close() + throws IOException + { + inputStream.close(); + } + + @Override + public long getReadTimeNanos() + { + return readTimeNanos; + } + + @Override + public long getSize() + { + return size; + } + + @Override + public void readFully(long position, byte[] buffer) + throws IOException + { + readFully(position, buffer, 0, buffer.length); + } + + @Override + public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength) + throws IOException + { + long start = System.nanoTime(); + + inputStream.readFully(position, buffer, bufferOffset, bufferLength); + + readTimeNanos += System.nanoTime() - start; + } + + @Override + public <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges) + throws IOException + { + checkNotNull(diskRanges, "diskRanges is null"); + + if (diskRanges.isEmpty()) { + return ImmutableMap.of(); + } + + Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance); + + // read ranges + Map<DiskRange, byte[]> buffers = new LinkedHashMap<DiskRange, byte[]>(); + for (DiskRange mergedRange : mergedRanges) { + // read full range in one request + byte[] buffer = new byte[mergedRange.getLength()]; + readFully(mergedRange.getOffset(), buffer); + buffers.put(mergedRange, buffer); + } + + ImmutableMap.Builder<K, Slice> slices = ImmutableMap.builder(); + for (Entry<K, DiskRange> entry : diskRanges.entrySet()) { + slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers)); + } + return slices.build(); + } + + @Override + public String toString() + { + return path; + } +} + + http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/orc/TestORCScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/orc/TestORCScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/orc/TestORCScanner.java new file mode 100644 index 0000000..b411793 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/orc/TestORCScanner.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.orc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.TimestampDatum; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.KeyValueSet; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.URL; + +public class TestORCScanner { + private ORCScanner orcScanner; + + public static Path getResourcePath(String path, String suffix) { + URL resultBaseURL = ClassLoader.getSystemResource(path); + return new Path(resultBaseURL.toString(), suffix); + } + + private static FileFragment getFileFragment(Configuration conf, String fileName) throws IOException { + Path tablePath = new Path(getResourcePath("dataset", "."), fileName); + FileSystem fs = FileSystem.getLocal(conf); + FileStatus status = fs.getFileStatus(tablePath); + return new FileFragment("table", tablePath, 0, status.getLen()); + } + + @Before + public void setup() throws IOException { + Schema schema = new Schema(); + schema.addColumn("userid", TajoDataTypes.Type.INT4); + schema.addColumn("movieid", TajoDataTypes.Type.INT4); + schema.addColumn("rating", TajoDataTypes.Type.INT2); + schema.addColumn("unixtimestamp", TajoDataTypes.Type.TEXT); + schema.addColumn("faketime", TajoDataTypes.Type.TIMESTAMP); + + Configuration conf = new TajoConf(); + + TableMeta meta = new TableMeta("ORC", new KeyValueSet()); + + Fragment fragment = getFileFragment(conf, "u_data_20.orc"); + + orcScanner = new ORCScanner(conf, schema, meta, fragment); + + orcScanner.init(); + } + + @Test + public void testReadTuple() { + try { + Tuple tuple = orcScanner.next(); + + assertEquals(tuple.getInt4(0), 196); + assertEquals(tuple.getInt4(1), 242); + assertEquals(tuple.getInt2(2), 3); + assertEquals(tuple.getText(3), "881250949"); + + // Timestamp test + TimestampDatum timestamp = (TimestampDatum)tuple.asDatum(4); + + assertEquals(timestamp.getYear(), 2008); + assertEquals(timestamp.getMonthOfYear(), 12); + assertEquals(timestamp.getDayOfMonth(), 12); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @After + public void end() { + try { + orcScanner.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/u_data_20.orc ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/u_data_20.orc b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/u_data_20.orc new file mode 100644 index 0000000..e6e9c49 Binary files /dev/null and b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/u_data_20.orc differ
