Repository: tajo
Updated Branches:
  refs/heads/master 3919896e1 -> fa063f0e8


TAJO-1464: Add ORCFileScanner to read ORCFile table.

Closes #579, closes #476

Signed-off-by: Jihoon Son <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fa063f0e
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fa063f0e
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fa063f0e

Branch: refs/heads/master
Commit: fa063f0e84d4ce9cb7e690a50a6a269289052779
Parents: 3919896
Author: Jongyoung Park <[email protected]>
Authored: Thu Jul 23 16:29:48 2015 +0900
Committer: Jihoon Son <[email protected]>
Committed: Thu Jul 23 16:30:54 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../org/apache/tajo/catalog/CatalogUtil.java    |   2 +
 .../src/main/proto/CatalogProtos.proto          |   1 +
 .../org/apache/tajo/datum/TimestampDatum.java   |   2 +-
 .../apache/tajo/storage/StorageConstants.java   |   2 +
 .../apache/tajo/util/datetime/DateTimeUtil.java |   2 +
 .../src/main/resources/storage-default.xml      |  11 +-
 .../src/test/resources/storage-default.xml      |  11 +-
 tajo-storage/tajo-storage-hdfs/pom.xml          |   5 +
 .../org/apache/tajo/storage/orc/ORCScanner.java | 324 +++++++++++++++++++
 .../thirdparty/orc/FileOrcDataSource.java       | 132 ++++++++
 .../thirdparty/orc/HdfsOrcDataSource.java       | 131 ++++++++
 .../apache/tajo/storage/orc/TestORCScanner.java | 107 ++++++
 .../src/test/resources/dataset/u_data_20.orc    | Bin 0 -> 813 bytes
 14 files changed, 730 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index f81db07..9aec974 100644
--- a/CHANGES
+++ b/CHANGES
@@ -399,6 +399,9 @@ Release 0.11.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1464: Add ORCFileScanner to read ORCFile table. (Contributed by 
+    Jongyoung Park, Committed by jihoon)
+
     TAJO-1693: Rearrange metric names. (hyunsik)
 
     TAJO-1496: Remove legacy CSVFile. (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index b7244b0..8205e9b 100644
--- 
a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ 
b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -305,6 +305,8 @@ public class CatalogUtil {
       return StoreType.ROWFILE;
     } else if (typeStr.equalsIgnoreCase(StoreType.RCFILE.name())) {
       return StoreType.RCFILE;
+    } else if (typeStr.equalsIgnoreCase(StoreType.ORC.name())) {
+      return StoreType.ORC;
     } else if (typeStr.equalsIgnoreCase(StoreType.PARQUET.name())) {
       return StoreType.PARQUET;
     } else if (typeStr.equalsIgnoreCase(StoreType.SEQUENCEFILE.name())) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git 
a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto 
b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index cb8c403..f95df0a 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -32,6 +32,7 @@ enum StoreType {
   RCFILE = 3;
   ROWFILE = 4;
   HCFILE = 5;
+  ORC = 6;
   PARQUET = 7;
   SEQUENCEFILE = 8;
   AVRO = 9;

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java 
b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java
index 02425eb..a05f76c 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java
@@ -36,7 +36,7 @@ public class TimestampDatum extends Datum {
 
   /**
    *
-   * @param timestamp UTC based
+   * @param timestamp UTC based Julian time microseconds
    */
   public TimestampDatum(long timestamp) {
     super(TajoDataTypes.Type.TIMESTAMP);

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java 
b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index 7158596..6df6228 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -78,6 +78,8 @@ public class StorageConstants {
   public static final String DEFAULT_BINARY_SERDE = 
"org.apache.tajo.storage.BinarySerializerDeserializer";
   public static final String DEFAULT_TEXT_SERDE = 
"org.apache.tajo.storage.TextSerializerDeserializer";
 
+  public static final String ORC_MAX_MERGE_DISTANCE = "orc.max.merge.distance";
+  public static final String DEFAULT_ORC_MAX_MERGE_DISTANCE = "1048576";  // 
1MB
 
   // Parquet file properties -------------------------------------------------
   public static final String PARQUET_DEFAULT_BLOCK_SIZE;

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java 
b/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java
index 570873d..5a338d3 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/datetime/DateTimeUtil.java
@@ -40,6 +40,8 @@ public class DateTimeUtil {
   /** maximum possible number of fields in a date * string */
   private static int MAXDATEFIELDS = 25;
 
+  public final static int DAYS_FROM_JULIAN_TO_EPOCH = 2440588;
+
   public static boolean isJulianCalendar(int year, int month, int day) {
     return year <= 1752 && month <= 9 && day < 14;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml 
b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
index 09261a9..dfdff85 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -39,7 +39,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>text,json,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
+    <value>text,json,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -68,6 +68,10 @@
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
   <property>
+    <name>tajo.storage.fragment.orc.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
     <name>tajo.storage.fragment.sequencefile.class</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
@@ -112,6 +116,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.orc.class</name>
+    <value>org.apache.tajo.storage.orc.ORCScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.sequencefile.class</name>
     <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
   </property>

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml 
b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
index ba7f4e8..f637da0 100644
--- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -38,7 +38,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>text,json,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
+    <value>text,json,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -67,6 +67,10 @@
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
   <property>
+    <name>tajo.storage.fragment.orc.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
     <name>tajo.storage.fragment.sequencefile.class</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
@@ -111,6 +115,11 @@
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.orc.class</name>
+    <value>org.apache.tajo.storage.orc.OrcScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.sequencefile.class</name>
     <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
   </property>

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml 
b/tajo-storage/tajo-storage-hdfs/pom.xml
index 9b98b0d..bfa5707 100644
--- a/tajo-storage/tajo-storage-hdfs/pom.xml
+++ b/tajo-storage/tajo-storage-hdfs/pom.xml
@@ -352,6 +352,11 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-buffer</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.facebook.presto</groupId>
+      <artifactId>presto-orc</artifactId>
+      <version>0.86</version>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
new file mode 100644
index 0000000..9511071
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.orc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.Fragment;
+import com.facebook.presto.orc.*;
+import com.facebook.presto.orc.metadata.OrcMetadataReader;
+import org.apache.tajo.storage.thirdparty.orc.HdfsOrcDataSource;
+import org.apache.tajo.util.datetime.DateTimeUtil;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * OrcScanner for reading ORC files
+ */
+public class ORCScanner extends FileScanner {
+  private static final Log LOG = LogFactory.getLog(ORCScanner.class);
+  private OrcRecordReader recordReader;
+  private Vector [] vectors;
+  private int currentPosInBatch = 0;
+  private int batchSize = 0;
+  private Tuple outTuple;
+
+  public ORCScanner(Configuration conf, final Schema schema, final TableMeta 
meta, final Fragment fragment) {
+    super(conf, schema, meta, fragment);
+  }
+
+  private Vector createOrcVector(TajoDataTypes.DataType type) {
+    switch (type.getType()) {
+      case INT1: case INT2: case INT4: case INT8:
+      case INET4:
+      case TIMESTAMP:
+      case DATE:
+        return new LongVector();
+
+      case FLOAT4:
+      case FLOAT8:
+        return new DoubleVector();
+
+      case BOOLEAN:
+      case NULL_TYPE:
+        return new BooleanVector();
+
+      case BLOB:
+      case TEXT:
+      case CHAR:
+      case PROTOBUF:
+        return new SliceVector();
+
+      default:
+        LOG.error("Not supported type for "+type.toString());
+        throw new UnimplementedException("ORC type: "+type.toString());
+    }
+  }
+
+  private FileSystem fs;
+  private FSDataInputStream fis;
+
+  private static class ColumnInfo {
+    TajoDataTypes.DataType type;
+    int id;
+  }
+
+  /**
+   * Temporary array for caching column info
+   */
+  private ColumnInfo [] targetColInfo;
+
+  @Override
+  public void init() throws IOException {
+    OrcReader orcReader;
+
+    if (targets == null) {
+      targets = schema.toArray();
+    }
+
+    super.init();
+
+    outTuple = new VTuple(targets.length);
+
+    Path path = fragment.getPath();
+
+    if(fs == null) {
+      fs = FileScanner.getFileSystem((TajoConf)conf, path);
+    }
+
+    if(fis == null) {
+      fis = fs.open(path);
+    }
+
+    OrcDataSource orcDataSource = new HdfsOrcDataSource(
+        this.fragment.getPath().toString(),
+        fis,
+        fs.getFileStatus(path).getLen(),
+        
Integer.parseInt(meta.getOption(StorageConstants.ORC_MAX_MERGE_DISTANCE,
+          StorageConstants.DEFAULT_ORC_MAX_MERGE_DISTANCE)));
+
+    targetColInfo = new ColumnInfo[targets.length];
+    for (int i=0; i<targets.length; i++) {
+      ColumnInfo cinfo = new ColumnInfo();
+      cinfo.type = targets[i].getDataType();
+      cinfo.id = schema.getColumnId(targets[i].getQualifiedName());
+      targetColInfo[i] = cinfo;
+    }
+
+    // creating vectors for buffering
+    vectors = new Vector[targetColInfo.length];
+    for (int i=0; i<targetColInfo.length; i++) {
+      vectors[i] = createOrcVector(targetColInfo[i].type);
+    }
+
+    Set<Integer> columnSet = new HashSet<Integer>();
+    for (ColumnInfo colInfo: targetColInfo) {
+      columnSet.add(colInfo.id);
+    }
+
+    orcReader = new OrcReader(orcDataSource, new OrcMetadataReader());
+
+    // TODO: make OrcPredicate useful
+    // TODO: TimeZone should be from conf
+    // TODO: it might be splittable
+    recordReader = orcReader.createRecordReader(columnSet, OrcPredicate.TRUE,
+        fragment.getStartKey(), fragment.getLength(), 
DateTimeZone.getDefault());
+
+    LOG.debug("file fragment { path: " + fragment.getPath() +
+      ", start offset: " + fragment.getStartKey() +
+      ", length: " + fragment.getLength() + "}");
+
+    getNextBatch();
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    if (currentPosInBatch == batchSize) {
+      getNextBatch();
+
+      // EOF
+      if (batchSize == -1) {
+        return null;
+      }
+    }
+
+    for (int i=0; i<targetColInfo.length; i++) {
+      outTuple.put(i, createValueDatum(vectors[i], targetColInfo[i].type));
+    }
+
+    currentPosInBatch++;
+
+    return outTuple;
+  }
+
+  // TODO: support more types
+  private Datum createValueDatum(Vector vector, TajoDataTypes.DataType type) {
+    switch (type.getType()) {
+      case INT1:
+      case INT2:
+        if (((LongVector) vector).isNull[currentPosInBatch])
+          return NullDatum.get();
+
+        return DatumFactory.createInt2((short) ((LongVector) 
vector).vector[currentPosInBatch]);
+
+      case INT4:
+        if (((LongVector) vector).isNull[currentPosInBatch])
+          return NullDatum.get();
+
+        return DatumFactory.createInt4((int) ((LongVector) 
vector).vector[currentPosInBatch]);
+
+      case INT8:
+        if (((LongVector) vector).isNull[currentPosInBatch])
+          return NullDatum.get();
+
+        return DatumFactory.createInt8(((LongVector) 
vector).vector[currentPosInBatch]);
+
+      case FLOAT4:
+        if (((DoubleVector) vector).isNull[currentPosInBatch])
+          return NullDatum.get();
+
+        return DatumFactory.createFloat4((float) ((DoubleVector) 
vector).vector[currentPosInBatch]);
+
+      case FLOAT8:
+        if (((DoubleVector) vector).isNull[currentPosInBatch])
+          return NullDatum.get();
+
+        return DatumFactory.createFloat8(((DoubleVector) 
vector).vector[currentPosInBatch]);
+
+      case BOOLEAN:
+        if (((BooleanVector) vector).isNull[currentPosInBatch])
+          return NullDatum.get();
+
+        return ((BooleanVector) vector).vector[currentPosInBatch] ? 
BooleanDatum.TRUE : BooleanDatum.FALSE;
+
+      case CHAR:
+        if (((SliceVector) vector).vector[currentPosInBatch] == null)
+          return NullDatum.get();
+
+        return DatumFactory.createChar(((SliceVector) 
vector).vector[currentPosInBatch].toStringUtf8());
+
+      case TEXT:
+        if (((SliceVector) vector).vector[currentPosInBatch] == null)
+          return NullDatum.get();
+
+        return DatumFactory.createText(((SliceVector) 
vector).vector[currentPosInBatch].getBytes());
+
+      case BLOB:
+        if (((SliceVector) vector).vector[currentPosInBatch] == null)
+          return NullDatum.get();
+
+        return DatumFactory.createBlob(((SliceVector) 
vector).vector[currentPosInBatch].getBytes());
+
+      case TIMESTAMP:
+        if (((LongVector) vector).isNull[currentPosInBatch])
+          return NullDatum.get();
+
+        return DatumFactory.createTimestamp(
+          DateTimeUtil.javaTimeToJulianTime(((LongVector) 
vector).vector[currentPosInBatch]));
+
+      case DATE:
+        if (((LongVector) vector).isNull[currentPosInBatch])
+          return NullDatum.get();
+
+        return DatumFactory.createDate(
+          (int) ((LongVector) vector).vector[currentPosInBatch] + 
DateTimeUtil.DAYS_FROM_JULIAN_TO_EPOCH);
+
+      case INET4:
+        if (((LongVector) vector).isNull[currentPosInBatch])
+          return NullDatum.get();
+
+        return DatumFactory.createInet4((int) ((LongVector) 
vector).vector[currentPosInBatch]);
+
+      case NULL_TYPE:
+        return NullDatum.get();
+
+      default:
+        throw new UnimplementedException("ORC type: "+type.toString());
+    }
+  }
+
+  /**
+   * Fetch next batch from ORC file to vectors as many as batch size
+   *
+   * @throws IOException
+   */
+  private void getNextBatch() throws IOException {
+    batchSize = recordReader.nextBatch();
+
+    for (int i=0; i<targetColInfo.length; i++) {
+      recordReader.readVector(targetColInfo[i].id, vectors[i]);
+    }
+
+    currentPosInBatch = 0;
+  }
+
+  @Override
+  public float getProgress() {
+    return recordReader.getProgress();
+  }
+
+  @Override
+  public void reset() throws IOException {
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (recordReader != null) {
+      recordReader.close();
+    }
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return true;
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return false;
+  }
+
+  @Override
+  public void setFilter(EvalNode filter) {
+    // TODO: implement it
+  }
+
+  @Override
+  public boolean isSplittable() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java
new file mode 100644
index 0000000..dcc1347
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import com.facebook.presto.orc.DiskRange;
+import com.facebook.presto.orc.OrcDataSource;
+import com.google.common.collect.ImmutableMap;
+import io.airlift.slice.Slice;
+import io.airlift.units.DataSize;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice;
+import static 
com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges;
+
+/**
+ * File data source class for Orc Reader
+ *
+ * Most of code is from Presto
+ */
+public class FileOrcDataSource
+        implements OrcDataSource
+{
+    private final File path;
+    private final long size;
+    private final RandomAccessFile input;
+    private final DataSize maxMergeDistance;
+    private long readTimeNanos;
+
+    public FileOrcDataSource(File path, double mergeDistance)
+            throws IOException
+    {
+        this.path = checkNotNull(path, "path is null");
+        this.size = path.length();
+        this.input = new RandomAccessFile(path, "r");
+
+        maxMergeDistance = new DataSize(mergeDistance, DataSize.Unit.BYTE);
+    }
+
+    @Override
+    public void close()
+            throws IOException
+    {
+        input.close();
+    }
+
+    @Override
+    public long getReadTimeNanos()
+    {
+        return readTimeNanos;
+    }
+
+    @Override
+    public long getSize()
+    {
+        return size;
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer)
+            throws IOException
+    {
+        readFully(position, buffer, 0, buffer.length);
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer, int bufferOffset, int 
bufferLength)
+            throws IOException
+    {
+        long start = System.nanoTime();
+
+        input.seek(position);
+        input.readFully(buffer, bufferOffset, bufferLength);
+
+        readTimeNanos += System.nanoTime() - start;
+    }
+
+    @Override
+    public <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges)
+            throws IOException
+    {
+        checkNotNull(diskRanges, "diskRanges is null");
+
+        if (diskRanges.isEmpty()) {
+            return ImmutableMap.of();
+        }
+
+        // TODO: benchmark alternatively strategies:
+        // 1) sort ranges and perform one read per range
+        // 2) single read with transferTo() using custom WritableByteChannel
+
+        Iterable<DiskRange> mergedRanges = 
mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance);
+
+        // read ranges
+        Map<DiskRange, byte[]> buffers = new LinkedHashMap<DiskRange, 
byte[]>();
+        for (DiskRange mergedRange : mergedRanges) {
+            // read full range in one request
+            byte[] buffer = new byte[mergedRange.getLength()];
+            readFully(mergedRange.getOffset(), buffer);
+            buffers.put(mergedRange, buffer);
+        }
+
+        ImmutableMap.Builder<K, Slice> slices = ImmutableMap.builder();
+        for (Entry<K, DiskRange> entry : diskRanges.entrySet()) {
+            slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), 
buffers));
+        }
+        return slices.build();
+    }
+
+    @Override
+    public String toString()
+    {
+        return path.getPath();
+    }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
new file mode 100644
index 0000000..73ea475
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java
@@ -0,0 +1,131 @@
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tajo.storage.thirdparty.orc;
+
+import com.facebook.presto.orc.DiskRange;
+import com.facebook.presto.orc.OrcDataSource;
+import com.google.common.collect.ImmutableMap;
+import io.airlift.slice.Slice;
+import io.airlift.units.DataSize;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice;
+import static 
com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * HDFS File data source class for Orc Reader
+ *
+ * Most of code is from Presto
+ */
+public class HdfsOrcDataSource
+  implements OrcDataSource
+{
+  private final FSDataInputStream inputStream;
+  private final String path;
+  private final long size;
+  private final DataSize maxMergeDistance;
+  private long readTimeNanos;
+
+  public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long 
size, double maxMergeDistance)
+  {
+    this.path = checkNotNull(path, "path is null");
+    this.inputStream = checkNotNull(inputStream, "inputStream is null");
+    this.size = size;
+    checkArgument(size >= 0, "size is negative");
+
+    DataSize mergeDistance = new DataSize(maxMergeDistance, 
DataSize.Unit.BYTE);
+    this.maxMergeDistance = checkNotNull(mergeDistance, "maxMergeDistance is 
null");
+  }
+
+  @Override
+  public void close()
+    throws IOException
+  {
+    inputStream.close();
+  }
+
+  @Override
+  public long getReadTimeNanos()
+  {
+    return readTimeNanos;
+  }
+
+  @Override
+  public long getSize()
+  {
+    return size;
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer)
+    throws IOException
+  {
+    readFully(position, buffer, 0, buffer.length);
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer, int bufferOffset, int 
bufferLength)
+    throws IOException
+  {
+    long start = System.nanoTime();
+
+    inputStream.readFully(position, buffer, bufferOffset, bufferLength);
+
+    readTimeNanos += System.nanoTime() - start;
+  }
+
+  @Override
+  public <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges)
+    throws IOException
+  {
+    checkNotNull(diskRanges, "diskRanges is null");
+
+    if (diskRanges.isEmpty()) {
+      return ImmutableMap.of();
+    }
+
+    Iterable<DiskRange> mergedRanges = 
mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance);
+
+    // read ranges
+    Map<DiskRange, byte[]> buffers = new LinkedHashMap<DiskRange, byte[]>();
+    for (DiskRange mergedRange : mergedRanges) {
+      // read full range in one request
+      byte[] buffer = new byte[mergedRange.getLength()];
+      readFully(mergedRange.getOffset(), buffer);
+      buffers.put(mergedRange, buffer);
+    }
+
+    ImmutableMap.Builder<K, Slice> slices = ImmutableMap.builder();
+    for (Entry<K, DiskRange> entry : diskRanges.entrySet()) {
+      slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers));
+    }
+    return slices.build();
+  }
+
+  @Override
+  public String toString()
+  {
+    return path;
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/orc/TestORCScanner.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/orc/TestORCScanner.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/orc/TestORCScanner.java
new file mode 100644
index 0000000..b411793
--- /dev/null
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/orc/TestORCScanner.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.TimestampDatum;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.URL;
+
+public class TestORCScanner {
+  private ORCScanner orcScanner;
+
+  public static Path getResourcePath(String path, String suffix) {
+    URL resultBaseURL = ClassLoader.getSystemResource(path);
+    return new Path(resultBaseURL.toString(), suffix);
+  }
+
+  private static FileFragment getFileFragment(Configuration conf, String 
fileName) throws IOException {
+    Path tablePath = new Path(getResourcePath("dataset", "."), fileName);
+    FileSystem fs = FileSystem.getLocal(conf);
+    FileStatus status = fs.getFileStatus(tablePath);
+    return new FileFragment("table", tablePath, 0, status.getLen());
+  }
+
+  @Before
+  public void setup() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("userid", TajoDataTypes.Type.INT4);
+    schema.addColumn("movieid", TajoDataTypes.Type.INT4);
+    schema.addColumn("rating", TajoDataTypes.Type.INT2);
+    schema.addColumn("unixtimestamp", TajoDataTypes.Type.TEXT);
+    schema.addColumn("faketime", TajoDataTypes.Type.TIMESTAMP);
+
+    Configuration conf = new TajoConf();
+
+    TableMeta meta = new TableMeta("ORC", new KeyValueSet());
+
+    Fragment fragment = getFileFragment(conf, "u_data_20.orc");
+
+    orcScanner = new ORCScanner(conf, schema, meta, fragment);
+
+    orcScanner.init();
+  }
+
+  @Test
+  public void testReadTuple() {
+    try {
+      Tuple tuple = orcScanner.next();
+
+      assertEquals(tuple.getInt4(0), 196);
+      assertEquals(tuple.getInt4(1), 242);
+      assertEquals(tuple.getInt2(2), 3);
+      assertEquals(tuple.getText(3), "881250949");
+
+      // Timestamp test
+      TimestampDatum timestamp = (TimestampDatum)tuple.asDatum(4);
+
+      assertEquals(timestamp.getYear(), 2008);
+      assertEquals(timestamp.getMonthOfYear(), 12);
+      assertEquals(timestamp.getDayOfMonth(), 12);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @After
+  public void end() {
+    try {
+      orcScanner.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/fa063f0e/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/u_data_20.orc
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/u_data_20.orc 
b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/u_data_20.orc
new file mode 100644
index 0000000..e6e9c49
Binary files /dev/null and 
b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/u_data_20.orc differ

Reply via email to