[
https://issues.apache.org/jira/browse/TAJO-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14633512#comment-14633512
]
ASF GitHub Bot commented on TAJO-1464:
--------------------------------------
Github user hyunsik commented on a diff in the pull request:
https://github.com/apache/tajo/pull/579#discussion_r34989058
--- Diff:
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
---
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.orc;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.Fragment;
+import com.facebook.presto.orc.*;
+import com.facebook.presto.orc.metadata.OrcMetadataReader;
+import org.apache.tajo.storage.thirdparty.orc.HdfsOrcDataSource;
+import org.apache.tajo.util.datetime.DateTimeUtil;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * OrcScanner for reading ORC files
+ */
+public class ORCScanner extends FileScanner {
+ private static final Log LOG = LogFactory.getLog(ORCScanner.class);
+ private OrcRecordReader recordReader;
+ private Vector [] vectors;
+ private int currentPosInBatch = 0;
+ private int batchSize = 0;
+
+ public ORCScanner(Configuration conf, final Schema schema, final
TableMeta meta, final Fragment fragment) {
+ super(conf, schema, meta, fragment);
+ }
+
+ private Vector createOrcVector(TajoDataTypes.DataType type) {
+ switch (type.getType()) {
+ case INT1: case INT2: case INT4: case INT8:
+ case UINT1: case UINT2: case UINT4: case UINT8:
+ case INET4:
+ case TIMESTAMP:
+ case DATE:
+ return new LongVector();
+
+ case FLOAT4:
+ case FLOAT8:
+ return new DoubleVector();
+
+ case BOOLEAN:
+ case NULL_TYPE:
+ return new BooleanVector();
+
+ case BLOB:
+ case TEXT:
+ case CHAR:
+ case PROTOBUF:
+ return new SliceVector();
+
+ default:
+ throw new UnsupportedException("This data type is not supported
currently: "+type.toString());
+ }
+ }
+
+ private FileSystem fs;
+ private FSDataInputStream fis;
+
+ private static class ColumnInfo {
+ TajoDataTypes.DataType type;
+ int id;
+ }
+
+ /**
+ * Temporary array for caching column info
+ */
+ private ColumnInfo [] targetColInfo;
+
+ @Override
+ public void init() throws IOException {
+ OrcReader orcReader;
+
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+ super.init();
+
+ Path path = fragment.getPath();
+
+ if(fs == null) {
+ fs = FileScanner.getFileSystem((TajoConf)conf, path);
+ }
+
+ if(fis == null) {
+ fis = fs.open(path);
+ }
+
+ OrcDataSource orcDataSource = new HdfsOrcDataSource(
+ this.fragment.getPath().toString(),
+ fis,
+ fs.getFileStatus(path).getLen(),
+
Integer.parseInt(meta.getOption(StorageConstants.ORC_MAX_MERGE_DISTANCE,
+ StorageConstants.DEFAULT_ORC_MAX_MERGE_DISTANCE)));
+
+ targetColInfo = new ColumnInfo[targets.length];
+ for (int i=0; i<targets.length; i++) {
+ ColumnInfo cinfo = new ColumnInfo();
+ cinfo.type = targets[i].getDataType();
+ cinfo.id = schema.getColumnId(targets[i].getQualifiedName());
+ targetColInfo[i] = cinfo;
+ }
+
+ // creating vectors for buffering
+ vectors = new Vector[targetColInfo.length];
+ for (int i=0; i<targetColInfo.length; i++) {
+ vectors[i] = createOrcVector(targetColInfo[i].type);
+ }
+
+ Set<Integer> columnSet = new HashSet<Integer>();
+ for (ColumnInfo colInfo: targetColInfo) {
+ columnSet.add(colInfo.id);
+ }
+
+ orcReader = new OrcReader(orcDataSource, new OrcMetadataReader());
+
+ // TODO: make OrcPredicate useful
+ // TODO: TimeZone should be from conf
+ // TODO: it might be splittable
+ recordReader = orcReader.createRecordReader(columnSet,
OrcPredicate.TRUE,
+ fragment.getStartKey(), fragment.getLength(),
DateTimeZone.getDefault());
+
+ LOG.debug("file fragment { path: " + fragment.getPath() +
+ ", start offset: " + fragment.getStartKey() +
+ ", length: " + fragment.getLength() + "}");
+
+ getNextBatch();
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ if (currentPosInBatch == batchSize) {
+ getNextBatch();
+
+ // EOF
+ if (batchSize == -1) {
+ return null;
+ }
+ }
+
+ Tuple tuple = new VTuple(targets.length);
+
+ for (int i=0; i<targetColInfo.length; i++) {
+ tuple.put(i, createValueDatum(vectors[i], targetColInfo[i].type));
+ }
+
+ currentPosInBatch++;
+
+ return tuple;
+ }
+
+ // TODO: support more types
+ private Datum createValueDatum(Vector vector, TajoDataTypes.DataType
type) {
+ switch (type.getType()) {
+ case INT1:
+ case UINT1:
+ case INT2:
+ case UINT2:
+ if (((LongVector) vector).isNull[currentPosInBatch])
+ return NullDatum.get();
+
+ return DatumFactory.createInt2((short) ((LongVector)
vector).vector[currentPosInBatch]);
+
+ case INT4:
+ case UINT4:
+ if (((LongVector) vector).isNull[currentPosInBatch])
+ return NullDatum.get();
+
+ return DatumFactory.createInt4((int) ((LongVector)
vector).vector[currentPosInBatch]);
+
+ case INT8:
+ case UINT8:
+ if (((LongVector) vector).isNull[currentPosInBatch])
+ return NullDatum.get();
+
+ return DatumFactory.createInt8(((LongVector)
vector).vector[currentPosInBatch]);
+
+ case FLOAT4:
+ if (((DoubleVector) vector).isNull[currentPosInBatch])
+ return NullDatum.get();
+
+ return DatumFactory.createFloat4((float) ((DoubleVector)
vector).vector[currentPosInBatch]);
+
+ case FLOAT8:
+ if (((DoubleVector) vector).isNull[currentPosInBatch])
+ return NullDatum.get();
+
+ return DatumFactory.createFloat8(((DoubleVector)
vector).vector[currentPosInBatch]);
+
+ case BOOLEAN:
+ if (((BooleanVector) vector).isNull[currentPosInBatch])
+ return NullDatum.get();
+
+ return ((BooleanVector) vector).vector[currentPosInBatch] ?
BooleanDatum.TRUE : BooleanDatum.FALSE;
+
+ case CHAR:
+ if (((SliceVector) vector).vector[currentPosInBatch] == null)
+ return NullDatum.get();
+
+ return DatumFactory.createChar(((SliceVector)
vector).vector[currentPosInBatch].toStringUtf8());
+
+ case TEXT:
+ if (((SliceVector) vector).vector[currentPosInBatch] == null)
+ return NullDatum.get();
+
+ return DatumFactory.createText(((SliceVector)
vector).vector[currentPosInBatch].getBytes());
+
+ case BLOB:
+ if (((SliceVector) vector).vector[currentPosInBatch] == null)
+ return NullDatum.get();
+
+ return DatumFactory.createBlob(((SliceVector)
vector).vector[currentPosInBatch].getBytes());
+
+ case TIMESTAMP:
+ if (((LongVector) vector).isNull[currentPosInBatch])
+ return NullDatum.get();
+
+ return DatumFactory.createTimestamp(
+ DateTimeUtil.javaTimeToJulianTime(((LongVector)
vector).vector[currentPosInBatch]));
+
+ case DATE:
+ if (((LongVector) vector).isNull[currentPosInBatch])
+ return NullDatum.get();
+
+ return DatumFactory.createDate(
+ (int) ((LongVector) vector).vector[currentPosInBatch] +
DateTimeUtil.DAYS_FROM_JULIAN_TO_EPOCH);
+
+ case INET4:
+ if (((LongVector) vector).isNull[currentPosInBatch])
--- End diff --
ORC may be not support INET4 type.
> Add ORCFileScanner to read ORCFile table
> ----------------------------------------
>
> Key: TAJO-1464
> URL: https://issues.apache.org/jira/browse/TAJO-1464
> Project: Tajo
> Issue Type: Sub-task
> Components: Storage
> Affects Versions: 0.10.0
> Reporter: Dongjoon Hyun
> Assignee: Jongyoung Park
> Fix For: 0.11.0
>
> Attachments: TAJO-1464.patch
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)