Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152503422 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java --- @@ -0,0 +1,747 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.StripeInformation; +import org.apache.orc.TypeDescription; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.orc.OrcUtils.fillRows; + +/** + * InputFormat to read ORC files. + */ +public class OrcRowInputFormat extends FileInputFormat<Row> implements ResultTypeQueryable<Row> { + + private static final Logger LOG = LoggerFactory.getLogger(OrcRowInputFormat.class); + // the number of rows read in a batch + private static final int DEFAULT_BATCH_SIZE = 1000; + + // the number of fields rows to read in a batch + private int batchSize; + // the configuration to read with + private Configuration conf; + // the schema of the ORC files to read + private TypeDescription schema; + + // the fields of the ORC schema that the returned Rows are composed of. + private int[] selectedFields; + // the type information of the Rows returned by this InputFormat. + private transient RowTypeInfo rowType; + + // the ORC reader + private transient RecordReader orcRowsReader; + // the vectorized row data to be read in a batch + private transient VectorizedRowBatch rowBatch; + // the vector of rows that is read in a batch + private transient Row[] rows; + + // the number of rows in the current batch + private transient int rowsInBatch; + // the index of the next row to return + private transient int nextRow; + + private ArrayList<Predicate> conjunctPredicates = new ArrayList<>(); + + /** + * Creates an OrcRowInputFormat. + * + * @param path The path to read ORC files from. + * @param schemaString The schema of the ORC files as String. + * @param orcConfig The configuration to read the ORC files with. + */ + public OrcRowInputFormat(String path, String schemaString, Configuration orcConfig) { + this(path, TypeDescription.fromString(schemaString), orcConfig, DEFAULT_BATCH_SIZE); + } + + /** + * Creates an OrcRowInputFormat. + * + * @param path The path to read ORC files from. + * @param schemaString The schema of the ORC files as String. + * @param orcConfig The configuration to read the ORC files with. + * @param batchSize The number of Row objects to read in a batch. + */ + public OrcRowInputFormat(String path, String schemaString, Configuration orcConfig, int batchSize) { + this(path, TypeDescription.fromString(schemaString), orcConfig, batchSize); + } + + /** + * Creates an OrcRowInputFormat. + * + * @param path The path to read ORC files from. + * @param orcSchema The schema of the ORC files as ORC TypeDescription. + * @param orcConfig The configuration to read the ORC files with. + * @param batchSize The number of Row objects to read in a batch. + */ + public OrcRowInputFormat(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize) { + super(new Path(path)); + + // configure OrcInputFormat --- End diff -- `OrcRowInputFormat`
---