wagnermarkd commented on a change in pull request #3994: Adding ORC reader
URL: https://github.com/apache/incubator-pinot/pull/3994#discussion_r267959254
 
 

 ##########
 File path: 
pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
 ##########
 @@ -0,0 +1,207 @@
+package org.apache.pinot.orc.data.readers;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcList;
+import org.apache.orc.mapred.OrcMapredRecordReader;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.data.readers.RecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The ORCRecordReader uses a VectorizedRowBatch, which we convert to a 
Writable. Then, we convert these
+ * Writable objects to primitives that we can then store in a GenericRow.
+ *
+ * When new data types are added to Pinot, we will need to update them here as 
well.
+ * Note that not all ORC types are supported; we only support the ORC types 
that correspond to either
+ * primitives or multivalue columns in Pinot, which is similar to other record 
readers.
+ */
+public class ORCRecordReader implements RecordReader {
+
+  private Schema _pinotSchema;
+  private TypeDescription _orcSchema;
+  Reader _reader;
+  org.apache.orc.RecordReader _recordReader;
+  VectorizedRowBatch _reusableVectorizedRowBatch;
+
+  public static final String LOCAL_FS_PREFIX = "file://";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ORCRecordReader.class);
+
+  private void init(String inputPath, Schema schema) {
+    Configuration conf = new Configuration();
+    LOGGER.info("Creating segment for {}", inputPath);
+    try {
+      Path orcReaderPath = new Path(LOCAL_FS_PREFIX + inputPath);
+      LOGGER.info("orc reader path is {}", orcReaderPath);
+      _reader = OrcFile.createReader(orcReaderPath, 
OrcFile.readerOptions(conf));
+      _orcSchema = _reader.getSchema();
+      LOGGER.info("ORC schema is {}", _orcSchema.toJson());
+
+      _pinotSchema = schema;
+      if (_pinotSchema == null) {
+        LOGGER.warn("Pinot schema is not set in segment generator config");
+      }
+      _recordReader = _reader.rows(_reader.options().schema(_orcSchema));
+    } catch (Exception e) {
+      LOGGER.error("Caught exception initializing record reader at path {}", 
inputPath);
+      throw new RuntimeException(e);
+    }
+
+    // Create a row batch with max size 1
+    _reusableVectorizedRowBatch = _orcSchema.createRowBatch(1);
+  }
+
+  @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+    init(segmentGeneratorConfig.getInputFilePath(), 
segmentGeneratorConfig.getSchema());
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      return _recordReader.getProgress() != 1;
+    } catch (IOException e) {
+      LOGGER.error("Could not get next record");
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public GenericRow next()
+      throws IOException {
+    return next(new GenericRow());
+  }
+
+  @Override
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    _recordReader.nextBatch(_reusableVectorizedRowBatch);
+    fillGenericRow(reuse, _reusableVectorizedRowBatch);
+    return reuse;
+  }
+
+  private void fillGenericRow(GenericRow genericRow, VectorizedRowBatch 
rowBatch) {
+    // ORC's TypeDescription is the equivalent of a schema. The way we will 
support ORC in Pinot
+    // will be to get the top level struct that contains all our fields and 
look through its
+    // children to determine the fields in our schemas.
+    if (_orcSchema.getCategory().equals(TypeDescription.Category.STRUCT)) {
+      for (int i = 0; i < _orcSchema.getChildren().size(); i++) {
+        // Get current column in schema
+        TypeDescription currColumn = _orcSchema.getChildren().get(i);
+        String currColumnName = _orcSchema.getFieldNames().get(i);
+        if (!_pinotSchema.getColumnNames().contains(currColumnName)) {
+          LOGGER.warn("Skipping column {} because it is not in pinot schema", 
currColumnName);
+          continue;
+        }
+        int currColRowIndex = currColumn.getId();
+        // Struct is top level, so the id of the struct is 0. However, the 
children start from 1+, etc, so we need to
+        // subtract one since the row batch we get has only children column 
vectors
+        ColumnVector vector = rowBatch.cols[currColRowIndex - 1];
 
 Review comment:
   You don't need to use `getId()` for indexing into the returned batch. The 
ids are an internal detail of ORC and reference individual streams in the 
serialized file. The VectorizedRowBatch that's returned to you will have 
exactly the fields that you specified when you set the schema in your 
ReaderOptions (or the file schema if you don't set it) and they will be in the 
same order as that schema. Therefore, you can just use `rowBatch.cols[i]` to 
get what you want.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to