wagnermarkd commented on a change in pull request #3994: Adding ORC reader URL: https://github.com/apache/incubator-pinot/pull/3994#discussion_r267959634
########## File path: pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java ########## @@ -0,0 +1,207 @@ +package org.apache.pinot.orc.data.readers; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcList; +import org.apache.orc.mapred.OrcMapredRecordReader; +import org.apache.pinot.common.data.Schema; +import org.apache.pinot.core.data.GenericRow; +import org.apache.pinot.core.data.readers.RecordReader; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The ORCRecordReader uses a VectorizedRowBatch, which we convert to a Writable. Then, we convert these + * Writable objects to primitives that we can then store in a GenericRow. + * + * When new data types are added to Pinot, we will need to update them here as well. + * Note that not all ORC types are supported; we only support the ORC types that correspond to either + * primitives or multivalue columns in Pinot, which is similar to other record readers. + */ +public class ORCRecordReader implements RecordReader { + + private Schema _pinotSchema; + private TypeDescription _orcSchema; + Reader _reader; + org.apache.orc.RecordReader _recordReader; + VectorizedRowBatch _reusableVectorizedRowBatch; + + public static final String LOCAL_FS_PREFIX = "file://"; + + private static final Logger LOGGER = LoggerFactory.getLogger(ORCRecordReader.class); + + private void init(String inputPath, Schema schema) { + Configuration conf = new Configuration(); + LOGGER.info("Creating segment for {}", inputPath); + try { + Path orcReaderPath = new Path(LOCAL_FS_PREFIX + inputPath); + LOGGER.info("orc reader path is {}", orcReaderPath); + _reader = OrcFile.createReader(orcReaderPath, OrcFile.readerOptions(conf)); + _orcSchema = _reader.getSchema(); + LOGGER.info("ORC schema is {}", _orcSchema.toJson()); + + _pinotSchema = schema; + if (_pinotSchema == null) { + LOGGER.warn("Pinot schema is not set in segment generator config"); + } + _recordReader = _reader.rows(_reader.options().schema(_orcSchema)); + } catch (Exception e) { + LOGGER.error("Caught exception initializing record reader at path {}", inputPath); + throw new RuntimeException(e); + } + + // Create a row batch with max size 1 + _reusableVectorizedRowBatch = _orcSchema.createRowBatch(1); + } + + @Override + public void init(SegmentGeneratorConfig segmentGeneratorConfig) { + init(segmentGeneratorConfig.getInputFilePath(), segmentGeneratorConfig.getSchema()); + } + + @Override + public boolean hasNext() { + try { + return _recordReader.getProgress() != 1; + } catch (IOException e) { + LOGGER.error("Could not get next record"); + throw new RuntimeException(e); + } + } + + @Override + public GenericRow next() + throws IOException { + return next(new GenericRow()); + } + + @Override + public GenericRow next(GenericRow reuse) + throws IOException { + _recordReader.nextBatch(_reusableVectorizedRowBatch); + fillGenericRow(reuse, _reusableVectorizedRowBatch); + return reuse; + } + + private void fillGenericRow(GenericRow genericRow, VectorizedRowBatch rowBatch) { + // ORC's TypeDescription is the equivalent of a schema. The way we will support ORC in Pinot + // will be to get the top level struct that contains all our fields and look through its + // children to determine the fields in our schemas. + if (_orcSchema.getCategory().equals(TypeDescription.Category.STRUCT)) { + for (int i = 0; i < _orcSchema.getChildren().size(); i++) { + // Get current column in schema + TypeDescription currColumn = _orcSchema.getChildren().get(i); + String currColumnName = _orcSchema.getFieldNames().get(i); + if (!_pinotSchema.getColumnNames().contains(currColumnName)) { + LOGGER.warn("Skipping column {} because it is not in pinot schema", currColumnName); + continue; + } + int currColRowIndex = currColumn.getId(); + // Struct is top level, so the id of the struct is 0. However, the children start from 1+, etc, so we need to + // subtract one since the row batch we get has only children column vectors + ColumnVector vector = rowBatch.cols[currColRowIndex - 1]; Review comment: That `currColRowIndex - 1` happens to work is only because you've been using flat schemas. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
