Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152526459 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java --- @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.orc.OrcRowInputFormat.Predicate; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.expressions.Attribute; +import org.apache.flink.table.expressions.BinaryComparison; +import org.apache.flink.table.expressions.EqualTo; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.GreaterThan; +import org.apache.flink.table.expressions.GreaterThanOrEqual; +import org.apache.flink.table.expressions.IsFalse; +import org.apache.flink.table.expressions.IsNotNull; +import org.apache.flink.table.expressions.IsNull; +import org.apache.flink.table.expressions.IsTrue; +import org.apache.flink.table.expressions.LessThan; +import org.apache.flink.table.expressions.LessThanOrEqual; +import org.apache.flink.table.expressions.Literal; +import org.apache.flink.table.expressions.Not; +import org.apache.flink.table.expressions.NotEqualTo; +import org.apache.flink.table.expressions.Or; +import org.apache.flink.table.expressions.UnaryExpression; +import org.apache.flink.table.sources.BatchTableSource; +import org.apache.flink.table.sources.FilterableTableSource; +import org.apache.flink.table.sources.ProjectableTableSource; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.types.Row; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.orc.TypeDescription; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * A TableSource to read ORC files. + * + * <p>The {@link OrcTableSource} supports projection and filter push-down.</p> + * + * <p>An {@link OrcTableSource} is used as shown in the example below. + * + * <pre> + * {@code + * String path = "file:///my/data/file.orc"; + * String schema = "struct<col1:boolean,col2:tinyint,col3:smallint,col4:int>" + * OrcTableSource orcSrc = new OrcTableSource(path, schema); + * tEnv.registerTableSource("orcTable", orcSrc); + * Table res = tableEnv.sql("SELECT * FROM orcTable"); + * } + * </pre> + */ +public class OrcTableSource + implements BatchTableSource<Row>, ProjectableTableSource<Row>, FilterableTableSource<Row> { + + private static final int DEFAULT_BATCH_SIZE = 1024; + + // path to read ORC files from + private final String path; + // schema of the ORC file + private final TypeDescription orcSchema; + // the schema of the Table + private final TableSchema tableSchema; + // the configuration to read the file + private final Configuration orcConfig; + // the number of rows to read in a batch + private final int batchSize; + + // type information of the data returned by the InputFormat + private final RowTypeInfo typeInfo; + // list of selected ORC fields to return + private final int[] selectedFields; + // list of predicates to apply + private final Predicate[] predicates; + + /** + * Creates an OrcTableSouce with empty configuration. + * + * @param path The path to read the ORC files from. + * @param orcSchema The schema of the ORC files as String. + */ + public OrcTableSource(String path, String orcSchema) { + this(path, orcSchema, new Configuration(), DEFAULT_BATCH_SIZE); + } + + /** + * Creates an OrcTableSource from an ORC schema string. + * + * @param path The path to read the ORC files from. + * @param orcSchema The schema of the ORC files as String. + * @param orcConfig The configuration to read the ORC files. + */ + public OrcTableSource(String path, String orcSchema, Configuration orcConfig) { + this(path, TypeDescription.fromString(orcSchema), orcConfig, DEFAULT_BATCH_SIZE); + } + + /** + * Creates an OrcTableSource from an ORC schema string. + * + * @param path The path to read the ORC files from. + * @param orcSchema The schema of the ORC files as String. + * @param orcConfig The configuration to read the ORC files. + * @param batchSize The number of Rows to read in a batch, default is 1000. + */ + public OrcTableSource(String path, String orcSchema, Configuration orcConfig, int batchSize) { + this(path, TypeDescription.fromString(orcSchema), orcConfig, batchSize); + } + + /** + * Creates an OrcTableSouce from an ORC TypeDescription. + * + * @param path The path to read the ORC files from. + * @param orcSchema The schema of the ORC files as TypeDescription. + * @param orcConfig The configuration to read the ORC files. + * @param batchSize The number of Rows to read in a batch, default is 1000. + */ + public OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig, int batchSize) { + this(path, orcSchema, orcConfig, batchSize, null, null); + } + + private OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig, + int batchSize, int[] selectedFields, Predicate[] predicates) { + + this.path = path; --- End diff -- Add some validation of the parameters?
---