[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.

This closes #5043.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35517f12
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35517f12
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35517f12

Branch: refs/heads/release-1.4
Commit: 35517f1291293f73f4466a4fdbed4296b2dd80a5
Parents: 7868ea4
Author: Fabian Hueske <[email protected]>
Authored: Mon Nov 13 14:54:54 2017 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Thu Nov 23 00:35:58 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   |    49 +
 flink-connectors/flink-orc/pom.xml              |    87 +-
 .../org/apache/flink/orc/OrcRowInputFormat.java |   745 ++
 .../org/apache/flink/orc/OrcTableSource.java    |   455 +-
 .../java/org/apache/flink/orc/OrcUtils.java     |  2379 ++--
 .../org/apache/flink/orc/RowOrcInputFormat.java |   241 -
 .../apache/flink/orc/OrcRowInputFormatTest.java |   795 ++
 .../apache/flink/orc/OrcTableSourceITCase.java  |   134 +-
 .../apache/flink/orc/OrcTableSourceTest.java    |   266 +-
 .../java/org/apache/flink/orc/OrcUtilsTest.java |   148 +
 .../apache/flink/orc/RowOrcInputFormatTest.java |   472 -
 .../test/resources/TestOrcFile.emptyFile.orc    |   Bin 523 -> 0 bytes
 .../TestOrcFile.listliststructlong.orc          |   Bin 845 -> 0 bytes
 .../src/test/resources/TestOrcFile.listlong.orc |   Bin 627 -> 0 bytes
 .../test/resources/TestOrcFile.liststring.orc   |   Bin 1298 -> 0 bytes
 .../src/test/resources/TestOrcFile.test1.orc    |   Bin 1711 -> 0 bytes
 .../test/resources/TestOrcFile.testDate1900.dat | 10000 -----------------
 .../test/resources/TestOrcFile.testDate1900.orc |   Bin 30941 -> 0 bytes
 .../flink-orc/src/test/resources/decimal.dat    |  6000 ----------
 .../flink-orc/src/test/resources/decimal.orc    |   Bin 16337 -> 0 bytes
 .../src/test/resources/demo-11-none.orc         |   Bin 5147970 -> 0 bytes
 .../src/test/resources/test-data-decimal.orc    |   Bin 0 -> 16337 bytes
 .../src/test/resources/test-data-flat.orc       |   Bin 0 -> 408522 bytes
 .../src/test/resources/test-data-nested.orc     |   Bin 0 -> 1711 bytes
 .../src/test/resources/test-data-nestedlist.orc |   Bin 0 -> 845 bytes
 .../src/test/resources/test-data-timetypes.orc  |   Bin 0 -> 30941 bytes
 .../flink/api/java/typeutils/RowTypeInfo.java   |    17 +
 .../logical/FlinkLogicalTableSourceScan.scala   |    16 +-
 .../table/plan/util/RexProgramExtractor.scala   |    12 +
 29 files changed, 3305 insertions(+), 18511 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 0b4bdbe..7387358 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -47,6 +47,7 @@ A custom `TableSource` can be defined by implementing the 
`BatchTableSource` or
 | `Kafka08AvroTableSource` | `flink-connector-kafka-0.8` | N | Y | A 
`TableSource` for Avro-encoded Kafka 0.8 topics.
 | `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A 
`TableSource` for flat Json-encoded Kafka 0.8 topics.
 | `CsvTableSource` | `flink-table` | Y | Y | A simple `TableSource` for CSV 
files.
+| `OrcTableSource` | `flink-orc` | Y | N | A `TableSource` for ORC files.
 
 All sources that come with the `flink-table` dependency are directly available 
for Table API or SQL programs. For all other table sources, you have to add the 
respective dependency in addition to the `flink-table` dependency.
 
@@ -485,6 +486,54 @@ val csvTableSource = CsvTableSource
 
 {% top %}
 
+### OrcTableSource
+
+The `OrcTableSource` reads [ORC files](https://orc.apache.org). ORC is a file 
format for structured data and stores the data in a compressed, columnar 
representation. ORC is very storage efficient and supports projection and 
filter push-down.
+
+An `OrcTableSource` is created as shown below:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+// create Hadoop Configuration
+Configuration config = new Configuration();
+
+OrcTableSource orcTableSource = OrcTableSource.builder()
+  // path to ORC file(s)
+  .path("file:///path/to/data")
+  // schema of ORC files
+  
.forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
+  // Hadoop configuration
+  .withConfiguration(config)
+  // build OrcTableSource
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+// create Hadoop Configuration
+val config = new Configuration()
+
+val orcTableSource = OrcTableSource.builder()
+  // path to ORC file(s)
+  .path("file:///path/to/data")
+  // schema of ORC files
+  
.forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
+  // Hadoop configuration
+  .withConfiguration(config)
+  // build OrcTableSource
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+**Note:** The `OrcTableSource` does not support ORC's `Union` type yet.
+
+{% top %}
+
 Provided TableSinks
 -------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/pom.xml 
b/flink-connectors/flink-orc/pom.xml
index 1ac7eaa..4866cd7 100644
--- a/flink-connectors/flink-orc/pom.xml
+++ b/flink-connectors/flink-orc/pom.xml
@@ -40,22 +40,39 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table_${scala.binary.version}</artifactId>
+                       
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
-                       <scope>compile</scope>
+                       <scope>provided</scope>
                </dependency>
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+                       
<artifactId>flink-table_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
-                       <scope>compile</scope>
+                       <scope>provided</scope>
+                       <!-- Projects depending on this project, won't depend 
on flink-table. -->
+                       <optional>true</optional>
                </dependency>
 
                <dependency>
                        <groupId>org.apache.orc</groupId>
                        <artifactId>orc-core</artifactId>
-                       <version>1.4.0</version>
+                       <version>1.4.1</version>
+                       <exclusions>
+                               <!-- Exclude ORC's Hadoop dependency and pull 
in Flink's shaded Hadoop. -->
+                               <exclusion>
+                                       <groupId>org.apache.hadoop</groupId>
+                                       <artifactId>hadoop-common</artifactId>
+                               </exclusion>
+                       </exclusions>
+               </dependency>
+
+               <!-- Replacement for ORC's Hadoop dependency. -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-hadoop2</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
                </dependency>
 
                <!-- test dependencies -->
@@ -88,65 +105,7 @@ under the License.
                        <scope>test</scope>
                        <type>test-jar</type>
                </dependency>
-       </dependencies>
 
-       <build>
-
-               <pluginManagement>
-                       <plugins>
-                               <!--This plugin's configuration is used to 
store Eclipse m2e settings only. It has no influence on the Maven build 
itself.-->
-                               <plugin>
-                                       <groupId>org.eclipse.m2e</groupId>
-                                       
<artifactId>lifecycle-mapping</artifactId>
-                                       <version>1.0.0</version>
-                                       <configuration>
-                                               <lifecycleMappingMetadata>
-                                                       <pluginExecutions>
-                                                               
<pluginExecution>
-                                                                       
<pluginExecutionFilter>
-                                                                               
<groupId>org.apache.maven.plugins</groupId>
-                                                                               
<artifactId>maven-assembly-plugin</artifactId>
-                                                                               
<versionRange>[2.4,)</versionRange>
-                                                                               
<goals>
-                                                                               
        <goal>single</goal>
-                                                                               
</goals>
-                                                                       
</pluginExecutionFilter>
-                                                                       <action>
-                                                                               
<ignore/>
-                                                                       
</action>
-                                                               
</pluginExecution>
-                                                               
<pluginExecution>
-                                                                       
<pluginExecutionFilter>
-                                                                               
<groupId>org.apache.maven.plugins</groupId>
-                                                                               
<artifactId>maven-clean-plugin</artifactId>
-                                                                               
<versionRange>[1,)</versionRange>
-                                                                               
<goals>
-                                                                               
        <goal>clean</goal>
-                                                                               
</goals>
-                                                                       
</pluginExecutionFilter>
-                                                                       <action>
-                                                                               
<ignore/>
-                                                                       
</action>
-                                                               
</pluginExecution>
-                                                               
<pluginExecution>
-                                                                       
<pluginExecutionFilter>
-                                                                               
<groupId>org.apache.avro</groupId>
-                                                                               
<artifactId>avro-maven-plugin</artifactId>
-                                                                               
<versionRange>[1.7.7,)</versionRange>
-                                                                               
<goals>
-                                                                               
        <goal>schema</goal>
-                                                                               
</goals>
-                                                                       
</pluginExecutionFilter>
-                                                                       <action>
-                                                                               
<ignore/>
-                                                                       
</action>
-                                                               
</pluginExecution>
-                                                       </pluginExecutions>
-                                               </lifecycleMappingMetadata>
-                                       </configuration>
-                               </plugin>
-                       </plugins>
-               </pluginManagement>
-       </build>
+       </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
 
b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
new file mode 100644
index 0000000..4353cbc
--- /dev/null
+++ 
b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
@@ -0,0 +1,745 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat<Row> implements 
ResultTypeQueryable<Row> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(OrcRowInputFormat.class);
+       // the number of rows read in a batch
+       private static final int DEFAULT_BATCH_SIZE = 1000;
+
+       // the number of fields rows to read in a batch
+       private int batchSize;
+       // the configuration to read with
+       private Configuration conf;
+       // the schema of the ORC files to read
+       private TypeDescription schema;
+
+       // the fields of the ORC schema that the returned Rows are composed of.
+       private int[] selectedFields;
+       // the type information of the Rows returned by this InputFormat.
+       private transient RowTypeInfo rowType;
+
+       // the ORC reader
+       private transient RecordReader orcRowsReader;
+       // the vectorized row data to be read in a batch
+       private transient VectorizedRowBatch rowBatch;
+       // the vector of rows that is read in a batch
+       private transient Row[] rows;
+
+       // the number of rows in the current batch
+       private transient int rowsInBatch;
+       // the index of the next row to return
+       private transient int nextRow;
+
+       private ArrayList<Predicate> conjunctPredicates = new ArrayList<>();
+
+       /**
+        * Creates an OrcRowInputFormat.
+        *
+        * @param path The path to read ORC files from.
+        * @param schemaString The schema of the ORC files as String.
+        * @param orcConfig The configuration to read the ORC files with.
+        */
+       public OrcRowInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+               this(path, TypeDescription.fromString(schemaString), orcConfig, 
DEFAULT_BATCH_SIZE);
+       }
+
+       /**
+        * Creates an OrcRowInputFormat.
+        *
+        * @param path The path to read ORC files from.
+        * @param schemaString The schema of the ORC files as String.
+        * @param orcConfig The configuration to read the ORC files with.
+        * @param batchSize The number of Row objects to read in a batch.
+        */
+       public OrcRowInputFormat(String path, String schemaString, 
Configuration orcConfig, int batchSize) {
+               this(path, TypeDescription.fromString(schemaString), orcConfig, 
batchSize);
+       }
+
+       /**
+        * Creates an OrcRowInputFormat.
+        *
+        * @param path The path to read ORC files from.
+        * @param orcSchema The schema of the ORC files as ORC TypeDescription.
+        * @param orcConfig The configuration to read the ORC files with.
+        * @param batchSize The number of Row objects to read in a batch.
+        */
+       public OrcRowInputFormat(String path, TypeDescription orcSchema, 
Configuration orcConfig, int batchSize) {
+               super(new Path(path));
+
+               // configure OrcRowInputFormat
+               this.schema = orcSchema;
+               this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
+               this.conf = orcConfig;
+               this.batchSize = batchSize;
+
+               // set default selection mask, i.e., all fields.
+               this.selectedFields = new int[this.schema.getChildren().size()];
+               for (int i = 0; i < selectedFields.length; i++) {
+                       this.selectedFields[i] = i;
+               }
+       }
+
+       /**
+        * Adds a filter predicate to reduce the number of rows to be returned 
by the input format.
+        * Multiple conjunctive predicates can be added by calling this method 
multiple times.
+        *
+        * <p>Note: Predicates can significantly reduce the amount of data that 
is read.
+        * However, the OrcRowInputFormat does not guarantee that all returned 
rows qualify the
+        * predicates. Moreover, predicates are only applied if the referenced 
field is among the
+        * selected fields.
+        *
+        * @param predicate The filter predicate.
+        */
+       public void addPredicate(Predicate predicate) {
+               // validate
+               validatePredicate(predicate);
+               // add predicate
+               this.conjunctPredicates.add(predicate);
+       }
+
+       private void validatePredicate(Predicate pred) {
+               if (pred instanceof ColumnPredicate) {
+                       // check column name
+                       String colName = ((ColumnPredicate) pred).columnName;
+                       if (!this.schema.getFieldNames().contains(colName)) {
+                               throw new IllegalArgumentException("Predicate 
cannot be applied. " +
+                                       "Column '" + colName + "' does not 
exist in ORC schema.");
+                       }
+               } else if (pred instanceof Not) {
+                       validatePredicate(((Not) pred).child());
+               } else if (pred instanceof Or) {
+                       for (Predicate p : ((Or) pred).children()) {
+                               validatePredicate(p);
+                       }
+               }
+       }
+
+       /**
+        * Selects the fields from the ORC schema that are returned by 
InputFormat.
+        *
+        * @param selectedFields The indices of the fields of the ORC schema 
that are returned by the InputFormat.
+        */
+       public void selectFields(int... selectedFields) {
+               // set field mapping
+               this.selectedFields = selectedFields;
+               // adapt result type
+               this.rowType = RowTypeInfo.projectFields(this.rowType, 
selectedFields);
+       }
+
+       /**
+        * Computes the ORC projection mask of the fields to include from the 
selected fields.rowOrcInputFormat.nextRecord(null).
+        *
+        * @return The ORC projection mask.
+        */
+       private boolean[] computeProjectionMask() {
+               // mask with all fields of the schema
+               boolean[] projectionMask = new boolean[schema.getMaximumId() + 
1];
+               // for each selected field
+               for (int inIdx : selectedFields) {
+                       // set all nested fields of a selected field to true
+                       TypeDescription fieldSchema = 
schema.getChildren().get(inIdx);
+                       for (int i = fieldSchema.getId(); i <= 
fieldSchema.getMaximumId(); i++) {
+                               projectionMask[i] = true;
+                       }
+               }
+               return projectionMask;
+       }
+
+       @Override
+       public void openInputFormat() throws IOException {
+               super.openInputFormat();
+               // create and initialize the row batch
+               this.rows = new Row[batchSize];
+               for (int i = 0; i < batchSize; i++) {
+                       rows[i] = new Row(selectedFields.length);
+               }
+       }
+
+       @Override
+       public void open(FileInputSplit fileSplit) throws IOException {
+
+               LOG.debug("Opening ORC file {}", fileSplit.getPath());
+
+               // open ORC file and create reader
+               org.apache.hadoop.fs.Path hPath = new 
org.apache.hadoop.fs.Path(fileSplit.getPath().getPath());
+               Reader orcReader = OrcFile.createReader(hPath, 
OrcFile.readerOptions(conf));
+
+               // get offset and length for the stripes that start in the split
+               Tuple2<Long, Long> offsetAndLength = 
getOffsetAndLengthForSplit(fileSplit, getStripes(orcReader));
+
+               // create ORC row reader configuration
+               Reader.Options options = getOptions(orcReader)
+                       .schema(schema)
+                       .range(offsetAndLength.f0, offsetAndLength.f1)
+                       .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
+                       
.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
+                       
.tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
+
+               // configure filters
+               if (!conjunctPredicates.isEmpty()) {
+                       SearchArgument.Builder b = 
SearchArgumentFactory.newBuilder();
+                       b = b.startAnd();
+                       for (Predicate predicate : conjunctPredicates) {
+                               predicate.add(b);
+                       }
+                       b = b.end();
+                       options.searchArgument(b.build(), new String[]{});
+               }
+
+               // configure selected fields
+               options.include(computeProjectionMask());
+
+               // create ORC row reader
+               this.orcRowsReader = orcReader.rows(options);
+
+               // assign ids
+               this.schema.getId();
+               // create row batch
+               this.rowBatch = schema.createRowBatch(batchSize);
+               rowsInBatch = 0;
+               nextRow = 0;
+       }
+
+       @VisibleForTesting
+       Reader.Options getOptions(Reader orcReader) {
+               return orcReader.options();
+       }
+
+       @VisibleForTesting
+       List<StripeInformation> getStripes(Reader orcReader) {
+               return orcReader.getStripes();
+       }
+
+       private Tuple2<Long, Long> getOffsetAndLengthForSplit(FileInputSplit 
split, List<StripeInformation> stripes) {
+               long splitStart = split.getStart();
+               long splitEnd = splitStart + split.getLength();
+
+               long readStart = Long.MAX_VALUE;
+               long readEnd = Long.MIN_VALUE;
+
+               for (StripeInformation s : stripes) {
+                       if (splitStart <= s.getOffset() && s.getOffset() < 
splitEnd) {
+                               // stripe starts in split, so it is included
+                               readStart = Math.min(readStart, s.getOffset());
+                               readEnd = Math.max(readEnd, s.getOffset() + 
s.getLength());
+                       }
+               }
+
+               if (readStart < Long.MAX_VALUE) {
+                       // at least one split is included
+                       return Tuple2.of(readStart, readEnd - readStart);
+               } else {
+                       return Tuple2.of(0L, 0L);
+               }
+       }
+
+       @Override
+       public void close() throws IOException {
+               if (orcRowsReader != null) {
+                       this.orcRowsReader.close();
+               }
+               this.orcRowsReader = null;
+       }
+
+       @Override
+       public void closeInputFormat() throws IOException {
+               this.rows = null;
+               this.rows = null;
+               this.schema = null;
+               this.rowBatch = null;
+       }
+
+       @Override
+       public boolean reachedEnd() throws IOException {
+               return !ensureBatch();
+       }
+
+       /**
+        * Checks if there is at least one row left in the batch to return.
+        * If no more row are available, it reads another batch of rows.
+        *
+        * @return Returns true if there is one more row to return, false 
otherwise.
+        * @throws IOException throw if an exception happens while reading a 
batch.
+        */
+       private boolean ensureBatch() throws IOException {
+
+               if (nextRow >= rowsInBatch) {
+                       // No more rows available in the Rows array.
+                       nextRow = 0;
+                       // Try to read the next batch if rows from the ORC file.
+                       boolean moreRows = orcRowsReader.nextBatch(rowBatch);
+
+                       if (moreRows) {
+                               // Load the data into the Rows array.
+                               rowsInBatch = fillRows(rows, schema, rowBatch, 
selectedFields);
+                       }
+                       return moreRows;
+               }
+               // there is at least one Row left in the Rows array.
+               return true;
+       }
+
+       @Override
+       public Row nextRecord(Row reuse) throws IOException {
+               // return the next row
+               return rows[this.nextRow++];
+       }
+
+       @Override
+       public TypeInformation<Row> getProducedType() {
+               return rowType;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Custom serialization methods
+       // 
--------------------------------------------------------------------------------------------
+
+       private void writeObject(ObjectOutputStream out) throws IOException {
+               out.writeInt(batchSize);
+               this.conf.write(out);
+               out.writeUTF(schema.toString());
+
+               out.writeInt(selectedFields.length);
+               for (int f : selectedFields) {
+                       out.writeInt(f);
+               }
+
+               out.writeInt(conjunctPredicates.size());
+               for (Predicate p : conjunctPredicates) {
+                       out.writeObject(p);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+               batchSize = in.readInt();
+               org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
+               configuration.readFields(in);
+
+               if (this.conf == null) {
+                       this.conf = configuration;
+               }
+               this.schema = TypeDescription.fromString(in.readUTF());
+
+               this.selectedFields = new int[in.readInt()];
+               for (int i = 0; i < selectedFields.length; i++) {
+                       this.selectedFields[i] = in.readInt();
+               }
+
+               this.conjunctPredicates = new ArrayList<>();
+               int numPreds = in.readInt();
+               for (int i = 0; i < numPreds; i++) {
+                       conjunctPredicates.add((Predicate) in.readObject());
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Classes to define predicates
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * A filter predicate that can be evaluated by the OrcRowInputFormat.
+        */
+       public abstract static class Predicate implements Serializable {
+               protected abstract SearchArgument.Builder 
add(SearchArgument.Builder builder);
+       }
+
+       abstract static class ColumnPredicate extends Predicate {
+               final String columnName;
+               final PredicateLeaf.Type literalType;
+
+               ColumnPredicate(String columnName, PredicateLeaf.Type 
literalType) {
+                       this.columnName = columnName;
+                       this.literalType = literalType;
+               }
+
+               Object castLiteral(Serializable literal) {
+
+                       switch (literalType) {
+                               case LONG:
+                                       if (literal instanceof Byte) {
+                                               return new Long((Byte) literal);
+                                       } else if (literal instanceof Short) {
+                                               return new Long((Short) 
literal);
+                                       } else if (literal instanceof Integer) {
+                                               return new Long((Integer) 
literal);
+                                       } else if (literal instanceof Long) {
+                                               return literal;
+                                       } else {
+                                               throw new 
IllegalArgumentException("A predicate on a LONG column requires an integer " +
+                                                       "literal, i.e., Byte, 
Short, Integer, or Long.");
+                                       }
+                               case FLOAT:
+                                       if (literal instanceof Float) {
+                                               return new Double((Float) 
literal);
+                                       } else if (literal instanceof Double) {
+                                               return literal;
+                                       } else if (literal instanceof 
BigDecimal) {
+                                               return ((BigDecimal) 
literal).doubleValue();
+                                       } else {
+                                               throw new 
IllegalArgumentException("A predicate on a FLOAT column requires a floating " +
+                                                       "literal, i.e., Float 
or Double.");
+                                       }
+                               case STRING:
+                                       if (literal instanceof String) {
+                                               return literal;
+                                       } else {
+                                               throw new 
IllegalArgumentException("A predicate on a STRING column requires a floating " +
+                                                       "literal, i.e., Float 
or Double.");
+                                       }
+                               case BOOLEAN:
+                                       if (literal instanceof Boolean) {
+                                               return literal;
+                                       } else {
+                                               throw new 
IllegalArgumentException("A predicate on a BOOLEAN column requires a Boolean 
literal.");
+                                       }
+                               case DATE:
+                                       if (literal instanceof Date) {
+                                               return literal;
+                                       } else {
+                                               throw new 
IllegalArgumentException("A predicate on a DATE column requires a java.sql.Date 
literal.");
+                                       }
+                               case TIMESTAMP:
+                                       if (literal instanceof Timestamp) {
+                                               return literal;
+                                       } else {
+                                               throw new 
IllegalArgumentException("A predicate on a TIMESTAMP column requires a 
java.sql.Timestamp literal.");
+                                       }
+                               case DECIMAL:
+                                       if (literal instanceof BigDecimal) {
+                                               return new 
HiveDecimalWritable(HiveDecimal.create((BigDecimal) literal));
+                                       } else {
+                                               throw new 
IllegalArgumentException("A predicate on a DECIMAL column requires a BigDecimal 
literal.");
+                                       }
+                               default:
+                                       throw new 
IllegalArgumentException("Unknown literal type " + literalType);
+                       }
+               }
+       }
+
+       abstract static class BinaryPredicate extends ColumnPredicate {
+               final Serializable literal;
+
+               BinaryPredicate(String columnName, PredicateLeaf.Type 
literalType, Serializable literal) {
+                       super(columnName, literalType);
+                       this.literal = literal;
+               }
+       }
+
+       /**
+        * An EQUALS predicate that can be evaluated by the OrcRowInputFormat.
+        */
+       public static class Equals extends BinaryPredicate {
+               /**
+                * Creates an EQUALS predicate.
+                *
+                * @param columnName The column to check.
+                * @param literalType The type of the literal.
+                * @param literal The literal value to check the column against.
+                */
+               public Equals(String columnName, PredicateLeaf.Type 
literalType, Serializable literal) {
+                       super(columnName, literalType, literal);
+               }
+
+               @Override
+               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+                       return builder.equals(columnName, literalType, 
castLiteral(literal));
+               }
+
+               @Override
+               public String toString() {
+                       return columnName + " = " + literal;
+               }
+       }
+
+       /**
+        * An EQUALS predicate that can be evaluated with Null safety by the 
OrcRowInputFormat.
+        */
+       public static class NullSafeEquals extends BinaryPredicate {
+               /**
+                * Creates a null-safe EQUALS predicate.
+                *
+                * @param columnName The column to check.
+                * @param literalType The type of the literal.
+                * @param literal The literal value to check the column against.
+                */
+               public NullSafeEquals(String columnName, PredicateLeaf.Type 
literalType, Serializable literal) {
+                       super(columnName, literalType, literal);
+               }
+
+               @Override
+               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+                       return builder.nullSafeEquals(columnName, literalType, 
castLiteral(literal));
+               }
+
+               @Override
+               public String toString() {
+                       return columnName + " = " + literal;
+               }
+       }
+
+       /**
+        * A LESS_THAN predicate that can be evaluated by the OrcRowInputFormat.
+        */
+       public static class LessThan extends BinaryPredicate {
+               /**
+                * Creates a LESS_THAN predicate.
+                *
+                * @param columnName The column to check.
+                * @param literalType The type of the literal.
+                * @param literal The literal value to check the column against.
+                */
+               public LessThan(String columnName, PredicateLeaf.Type 
literalType, Serializable literal) {
+                       super(columnName, literalType, literal);
+               }
+
+               @Override
+               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+                       return builder.lessThan(columnName, literalType, 
castLiteral(literal));
+               }
+
+               @Override
+               public String toString() {
+                       return columnName + " < " + literal;
+               }
+       }
+
+       /**
+        * A LESS_THAN_EQUALS predicate that can be evaluated by the 
OrcRowInputFormat.
+        */
+       public static class LessThanEquals extends BinaryPredicate {
+               /**
+                * Creates a LESS_THAN_EQUALS predicate.
+                *
+                * @param columnName The column to check.
+                * @param literalType The type of the literal.
+                * @param literal The literal value to check the column against.
+                */
+               public LessThanEquals(String columnName, PredicateLeaf.Type 
literalType, Serializable literal) {
+                       super(columnName, literalType, literal);
+               }
+
+               @Override
+               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+                       return builder.lessThanEquals(columnName, literalType, 
castLiteral(literal));
+               }
+
+               @Override
+               public String toString() {
+                       return columnName + " <= " + literal;
+               }
+       }
+
+       /**
+        * An IS_NULL predicate that can be evaluated by the OrcRowInputFormat.
+        */
+       public static class IsNull extends ColumnPredicate {
+               /**
+                * Creates an IS_NULL predicate.
+                *
+                * @param columnName The column to check for null.
+                * @param literalType The type of the column to check for null.
+                */
+               public IsNull(String columnName, PredicateLeaf.Type 
literalType) {
+                       super(columnName, literalType);
+               }
+
+               @Override
+               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+                       return builder.isNull(columnName, literalType);
+               }
+
+               @Override
+               public String toString() {
+                       return columnName + " IS NULL";
+               }
+       }
+
+       /**
+        * An BETWEEN predicate that can be evaluated by the OrcRowInputFormat.
+        */
+       public static class Between extends ColumnPredicate {
+               private Serializable lowerBound;
+               private Serializable upperBound;
+
+               /**
+                * Creates an BETWEEN predicate.
+                *
+                * @param columnName The column to check.
+                * @param literalType The type of the literals.
+                * @param lowerBound The literal value of the (inclusive) lower 
bound to check the column against.
+                * @param upperBound The literal value of the (inclusive) upper 
bound to check the column against.
+                */
+               public Between(String columnName, PredicateLeaf.Type 
literalType, Serializable lowerBound, Serializable upperBound) {
+                       super(columnName, literalType);
+                       this.lowerBound = lowerBound;
+                       this.upperBound = upperBound;
+               }
+
+               @Override
+               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+                       return builder.between(columnName, literalType, 
castLiteral(lowerBound), castLiteral(upperBound));
+               }
+
+               @Override
+               public String toString() {
+                       return lowerBound + " <= " + columnName + " <= " + 
upperBound;
+               }
+       }
+
+       /**
+        * An IN predicate that can be evaluated by the OrcRowInputFormat.
+        */
+       public static class In extends ColumnPredicate {
+               private Serializable[] literals;
+
+               /**
+                * Creates an IN predicate.
+                *
+                * @param columnName The column to check.
+                * @param literalType The type of the literals.
+                * @param literals The literal values to check the column 
against.
+                */
+               public In(String columnName, PredicateLeaf.Type literalType, 
Serializable... literals) {
+                       super(columnName, literalType);
+                       this.literals = literals;
+               }
+
+               @Override
+               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+                       Object[] castedLiterals = new Object[literals.length];
+                       for (int i = 0; i < literals.length; i++) {
+                               castedLiterals[i] = castLiteral(literals[i]);
+                       }
+                       return builder.in(columnName, literalType, (Object[]) 
castedLiterals);
+               }
+
+               @Override
+               public String toString() {
+                       return columnName + " IN " + Arrays.toString(literals);
+               }
+       }
+
+       /**
+        * A NOT predicate to negate a predicate that can be evaluated by the 
OrcRowInputFormat.
+        */
+       public static class Not extends Predicate {
+               private final Predicate pred;
+
+               /**
+                * Creates a NOT predicate.
+                *
+                * @param predicate The predicate to negate.
+                */
+               public Not(Predicate predicate) {
+                       this.pred = predicate;
+               }
+
+               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+                       return pred.add(builder.startNot()).end();
+               }
+
+               protected Predicate child() {
+                       return pred;
+               }
+
+               @Override
+               public String toString() {
+                       return "NOT(" + pred.toString() + ")";
+               }
+       }
+
+       /**
+        * An OR predicate that can be evaluated by the OrcRowInputFormat.
+        */
+       public static class Or extends Predicate {
+               private final Predicate[] preds;
+
+               /**
+                * Creates an OR predicate.
+                *
+                * @param predicates The disjunctive predicates.
+                */
+               public Or(Predicate... predicates) {
+                       this.preds = predicates;
+               }
+
+               @Override
+               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+                       SearchArgument.Builder withOr = builder.startOr();
+                       for (Predicate p : preds) {
+                               withOr = p.add(withOr);
+                       }
+                       return withOr.end();
+               }
+
+               protected Iterable<Predicate> children() {
+                       return Arrays.asList(preds);
+               }
+
+               @Override
+               public String toString() {
+                       return "OR(" + Arrays.toString(preds) + ")";
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
 
b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
index 0454ba4..b7c5378 100644
--- 
a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
+++ 
b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcTableSource.java
@@ -18,111 +18,474 @@
 
 package org.apache.flink.orc;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.orc.OrcRowInputFormat.Predicate;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.Attribute;
+import org.apache.flink.table.expressions.BinaryComparison;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.GreaterThanOrEqual;
+import org.apache.flink.table.expressions.IsNotNull;
+import org.apache.flink.table.expressions.IsNull;
+import org.apache.flink.table.expressions.LessThan;
+import org.apache.flink.table.expressions.LessThanOrEqual;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.Not;
+import org.apache.flink.table.expressions.NotEqualTo;
+import org.apache.flink.table.expressions.Or;
+import org.apache.flink.table.expressions.UnaryExpression;
 import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.FilterableTableSource;
 import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.orc.TypeDescription;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 /**
- * Creates a TableSource to read ORC file.
+ * A TableSource to read ORC files.
  *
- * <p>The ORC file path and schema is passed during {@link OrcTableSource} 
construction. configuration is optional.
+ * <p>The {@link OrcTableSource} supports projection and filter push-down.</p>
  *
- * <p>The OrcTableSource is used as shown in the example below.
+ * <p>An {@link OrcTableSource} is used as shown in the example below.
  *
  * <pre>
  * {@code
- * String path = testInputURL.getPath();
- * String schema = "struct<col1:boolean,col2:tinyint,col3:smallint,col4:int>"
- * OrcTableSource orcSrc = new OrcTableSource(path, schema);
+ * OrcTableSource orcSrc = OrcTableSource.builder()
+ *   .path("file:///my/data/file.orc")
+ *   .forOrcSchema("struct<col1:boolean,col2:tinyint,col3:smallint,col4:int>")
+ *   .build();
+ *
  * tEnv.registerTableSource("orcTable", orcSrc);
  * Table res = tableEnv.sql("SELECT * FROM orcTable");
  * }
  * </pre>
  */
-public class OrcTableSource implements BatchTableSource<Row>, 
ProjectableTableSource<Row> {
+public class OrcTableSource
+       implements BatchTableSource<Row>, ProjectableTableSource<Row>, 
FilterableTableSource<Row> {
 
-       private String path;
-       private TypeDescription orcSchema;
-       private RowTypeInfo typeInfo;
-       private Configuration orcConfig;
-       private int[] fieldMapping;
+       private static final int DEFAULT_BATCH_SIZE = 1000;
 
-       /**
-        * The ORC file path and schema.
-        *
-        * @param path      the path of orc file
-        * @param orcSchema schema of orc file
-        */
-       public OrcTableSource(String path, String orcSchema) {
-               this(path, orcSchema, new Configuration());
-       }
+       // path to read ORC files from
+       private final String path;
+       // schema of the ORC file
+       private final TypeDescription orcSchema;
+       // the schema of the Table
+       private final TableSchema tableSchema;
+       // the configuration to read the file
+       private final Configuration orcConfig;
+       // the number of rows to read in a batch
+       private final int batchSize;
+
+       // type information of the data returned by the InputFormat
+       private final RowTypeInfo typeInfo;
+       // list of selected ORC fields to return
+       private final int[] selectedFields;
+       // list of predicates to apply
+       private final Predicate[] predicates;
 
        /**
-        * The file path and schema of orc file, and configuration to read orc 
file .
+        * Creates an OrcTableSouce from an ORC TypeDescription.
         *
-        * @param path      the path of orc file
-        * @param orcSchema schema of orc file
-        * @param orcConfig configuration to read orc file
+        * @param path          The path to read the ORC files from.
+        * @param orcSchema The schema of the ORC files as TypeDescription.
+        * @param orcConfig The configuration to read the ORC files.
+        * @param batchSize The number of Rows to read in a batch, default is 
1000.
         */
-       public OrcTableSource(String path, String orcSchema, Configuration 
orcConfig) {
-               this(path, TypeDescription.fromString(orcSchema), orcConfig);
+       private OrcTableSource(String path, TypeDescription orcSchema, 
Configuration orcConfig, int batchSize) {
+               this(path, orcSchema, orcConfig, batchSize, null, null);
        }
 
-       public OrcTableSource(String path, TypeDescription orcSchema, 
Configuration orcConfig) {
+       private OrcTableSource(String path, TypeDescription orcSchema, 
Configuration orcConfig,
+                                                       int batchSize, int[] 
selectedFields, Predicate[] predicates) {
+
+               Preconditions.checkNotNull(path, "Path must not be null.");
+               Preconditions.checkNotNull(orcSchema, "OrcSchema must not be 
null.");
+               Preconditions.checkNotNull(path, "Configuration must not be 
null.");
+               Preconditions.checkArgument(batchSize > 0, "Batch size must be 
larger than null.");
                this.path = path;
                this.orcSchema = orcSchema;
                this.orcConfig = orcConfig;
+               this.batchSize = batchSize;
+               this.selectedFields = selectedFields;
+               this.predicates = predicates;
 
-               this.typeInfo = (RowTypeInfo) 
OrcUtils.schemaToTypeInfo(this.orcSchema);
+               // determine the type information from the ORC schema
+               RowTypeInfo typeInfoFromSchema = (RowTypeInfo) 
OrcUtils.schemaToTypeInfo(this.orcSchema);
+
+               // set return type info
+               if (selectedFields == null) {
+                       this.typeInfo = typeInfoFromSchema;
+               } else {
+                       this.typeInfo = 
RowTypeInfo.projectFields(typeInfoFromSchema, selectedFields);
+               }
 
+               // create a TableSchema that corresponds to the ORC schema
+               this.tableSchema = new TableSchema(
+                       typeInfoFromSchema.getFieldNames(),
+                       typeInfoFromSchema.getFieldTypes()
+               );
        }
 
        @Override
        public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
-
-               RowOrcInputFormat orcIF = new RowOrcInputFormat(path, 
orcSchema, orcConfig);
-               if (fieldMapping != null) {
-                       orcIF.setFieldMapping(fieldMapping);
+               OrcRowInputFormat orcIF = buildOrcInputFormat();
+               if (selectedFields != null) {
+                       orcIF.selectFields(selectedFields);
+               }
+               if (predicates != null) {
+                       for (OrcRowInputFormat.Predicate pred : predicates) {
+                               orcIF.addPredicate(pred);
+                       }
                }
                return execEnv.createInput(orcIF);
        }
 
+       @VisibleForTesting
+       protected OrcRowInputFormat buildOrcInputFormat() {
+               return new OrcRowInputFormat(path, orcSchema, orcConfig, 
batchSize);
+       }
+
        @Override
        public TypeInformation<Row> getReturnType() {
                return typeInfo;
        }
 
        @Override
-       public TableSource<Row> projectFields(int[] fields) {
+       public TableSchema getTableSchema() {
+               return this.tableSchema;
+       }
 
-               OrcTableSource copy = new OrcTableSource(path, orcSchema, 
orcConfig);
+       @Override
+       public TableSource<Row> projectFields(int[] selectedFields) {
+               // create a copy of the OrcTableSouce with new selected fields
+               return new OrcTableSource(path, orcSchema, orcConfig, 
batchSize, selectedFields, predicates);
+       }
 
-               // set field mapping
-               copy.fieldMapping = fields;
+       @Override
+       public TableSource<Row> applyPredicate(List<Expression> predicates) {
+               ArrayList<Predicate> orcPredicates = new ArrayList<>();
 
-               // adapt TypeInfo
-               TypeInformation[] fieldTypes = new 
TypeInformation[fields.length];
-               String[] fieldNames = new String[fields.length];
-               for (int i = 0; i < fields.length; i++) {
-                       fieldTypes[i] = this.typeInfo.getTypeAt(fields[i]);
-                       fieldNames[i] = 
this.typeInfo.getFieldNames()[fields[i]];
+               // we do not remove any predicates from the list because ORC 
does not fully apply predicates
+               for (Expression pred : predicates) {
+                       Predicate orcPred = toOrcPredicate(pred);
+                       if (orcPred != null) {
+                               orcPredicates.add(orcPred);
+                       }
                }
-               copy.typeInfo = new RowTypeInfo(fieldTypes, fieldNames);
 
-               return copy;
+               return new OrcTableSource(path, orcSchema, orcConfig, 
batchSize, selectedFields, orcPredicates.toArray(new Predicate[]{}));
+       }
+
+       @Override
+       public boolean isFilterPushedDown() {
+               return this.predicates != null;
        }
 
        @Override
        public String explainSource() {
-               return "ORC Source file at path " + this.path + " with schema " 
+ this.orcSchema;
+               return "OrcFile[path=" + path + ", schema=" + orcSchema + ", 
filter=" + predicateString() + "]";
+       }
+
+       private String predicateString() {
+               if (predicates != null) {
+                       return "AND(" + Arrays.toString(predicates) + ")";
+               } else {
+                       return "TRUE";
+               }
+       }
+
+       // Predicate conversion for filter push-down.
+
+       private Predicate toOrcPredicate(Expression pred) {
+               if (pred instanceof Or) {
+                       Predicate c1 = toOrcPredicate(((Or) pred).left());
+                       Predicate c2 = toOrcPredicate(((Or) pred).right());
+                       if (c1 == null || c2 == null) {
+                               return null;
+                       } else {
+                               return new OrcRowInputFormat.Or(c1, c2);
+                       }
+               } else if (pred instanceof Not) {
+                       Predicate c = toOrcPredicate(((Not) pred).child());
+                       if (c == null) {
+                               return null;
+                       } else {
+                               return new OrcRowInputFormat.Not(c);
+                       }
+               } else if (pred instanceof BinaryComparison) {
+
+                       BinaryComparison binComp = (BinaryComparison) pred;
+
+                       if (!isValid(binComp)) {
+                               // not a valid predicate
+                               return null;
+                       }
+                       PredicateLeaf.Type litType = getLiteralType(binComp);
+                       if (litType == null) {
+                               // unsupported literal type
+                               return null;
+                       }
+
+                       boolean literalOnRight = literalOnRight(binComp);
+                       String colName = getColumnName(binComp);
+                       Serializable literal = (Serializable) 
getLiteral(binComp);
+
+                       if (pred instanceof EqualTo) {
+                               return new OrcRowInputFormat.Equals(colName, 
litType, literal);
+                       } else if (pred instanceof NotEqualTo) {
+                               return new OrcRowInputFormat.Not(
+                                       new OrcRowInputFormat.Equals(colName, 
litType, literal));
+                       } else if (pred instanceof GreaterThan) {
+                               if (literalOnRight) {
+                                       return new OrcRowInputFormat.Not(
+                                               new 
OrcRowInputFormat.LessThanEquals(colName, litType, literal));
+                               } else {
+                                       return new 
OrcRowInputFormat.LessThan(colName, litType, literal);
+                               }
+                       } else if (pred instanceof GreaterThanOrEqual) {
+                               if (literalOnRight) {
+                                       return new OrcRowInputFormat.Not(
+                                               new 
OrcRowInputFormat.LessThan(colName, litType, literal));
+                               } else {
+                                       return new 
OrcRowInputFormat.LessThanEquals(colName, litType, literal);
+                               }
+                       } else if (pred instanceof LessThan) {
+                               if (literalOnRight) {
+                                       return new 
OrcRowInputFormat.LessThan(colName, litType, literal);
+                               } else {
+                                       return new OrcRowInputFormat.Not(
+                                               new 
OrcRowInputFormat.LessThanEquals(colName, litType, literal));
+                               }
+                       } else if (pred instanceof LessThanOrEqual) {
+                               if (literalOnRight) {
+                                       return new 
OrcRowInputFormat.LessThanEquals(colName, litType, literal);
+                               } else {
+                                       return new OrcRowInputFormat.Not(
+                                               new 
OrcRowInputFormat.LessThan(colName, litType, literal));
+                               }
+                       } else {
+                               // unsupported predicate
+                               return null;
+                       }
+               } else if (pred instanceof UnaryExpression) {
+
+                       UnaryExpression unary = (UnaryExpression) pred;
+                       if (!isValid(unary)) {
+                               // not a valid predicate
+                               return null;
+                       }
+                       PredicateLeaf.Type colType = 
toOrcType(((UnaryExpression) pred).child().resultType());
+                       if (colType == null) {
+                               // unsupported type
+                               return null;
+                       }
+
+                       String colName = getColumnName(unary);
+
+                       if (pred instanceof IsNull) {
+                               return new OrcRowInputFormat.IsNull(colName, 
colType);
+                       } else if (pred instanceof IsNotNull) {
+                               return new OrcRowInputFormat.Not(
+                                       new OrcRowInputFormat.IsNull(colName, 
colType));
+                       } else {
+                               // unsupported predicate
+                               return null;
+                       }
+               } else {
+                       // unsupported predicate
+                       return null;
+               }
+       }
+
+       private boolean isValid(UnaryExpression unary) {
+               return unary.child() instanceof Attribute;
+       }
+
+       private boolean isValid(BinaryComparison comp) {
+               return (comp.left() instanceof Literal && comp.right() 
instanceof Attribute) ||
+                       (comp.left() instanceof Attribute && comp.right() 
instanceof Literal);
+       }
+
+       private boolean literalOnRight(BinaryComparison comp) {
+               if (comp.left() instanceof Literal && comp.right() instanceof 
Attribute) {
+                       return false;
+               } else if (comp.left() instanceof Attribute && comp.right() 
instanceof Literal) {
+                       return true;
+               } else {
+                       throw new RuntimeException("Invalid binary 
comparison.");
+               }
+       }
+
+       private String getColumnName(UnaryExpression unary) {
+               return ((Attribute) unary.child()).name();
+       }
+
+       private String getColumnName(BinaryComparison comp) {
+               if (literalOnRight(comp)) {
+                       return ((Attribute) comp.left()).name();
+               } else {
+                       return ((Attribute) comp.right()).name();
+               }
+       }
+
+       private PredicateLeaf.Type getLiteralType(BinaryComparison comp) {
+               if (literalOnRight(comp)) {
+                       return toOrcType(((Literal) comp.right()).resultType());
+               } else {
+                       return toOrcType(((Literal) comp.left()).resultType());
+               }
+       }
+
+       private Object getLiteral(BinaryComparison comp) {
+               if (literalOnRight(comp)) {
+                       return ((Literal) comp.right()).value();
+               } else {
+                       return ((Literal) comp.left()).value();
+               }
+       }
+
+       private PredicateLeaf.Type toOrcType(TypeInformation<?> type) {
+               if (type == BasicTypeInfo.BYTE_TYPE_INFO ||
+                       type == BasicTypeInfo.SHORT_TYPE_INFO ||
+                       type == BasicTypeInfo.INT_TYPE_INFO ||
+                       type == BasicTypeInfo.LONG_TYPE_INFO) {
+                       return PredicateLeaf.Type.LONG;
+               } else if (type == BasicTypeInfo.FLOAT_TYPE_INFO ||
+                       type == BasicTypeInfo.DOUBLE_TYPE_INFO) {
+                       return PredicateLeaf.Type.FLOAT;
+               } else if (type == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+                       return PredicateLeaf.Type.BOOLEAN;
+               } else if (type == BasicTypeInfo.STRING_TYPE_INFO) {
+                       return PredicateLeaf.Type.STRING;
+               } else if (type == SqlTimeTypeInfo.TIMESTAMP) {
+                       return PredicateLeaf.Type.TIMESTAMP;
+               } else if (type == SqlTimeTypeInfo.DATE) {
+                       return PredicateLeaf.Type.DATE;
+               } else if (type == BasicTypeInfo.BIG_DEC_TYPE_INFO) {
+                       return PredicateLeaf.Type.DECIMAL;
+               } else {
+                       // unsupported type
+                       return null;
+               }
+       }
+
+       // Builder
+
+       public static Builder builder() {
+               return new Builder();
+       }
+
+       /**
+        * Constructs an {@link OrcTableSource}.
+        */
+       public static class Builder {
+
+               private String path;
+
+               private TypeDescription schema;
+
+               private Configuration config;
+
+               private int batchSize = 0;
+
+               /**
+                * Sets the path of the ORC file(s).
+                *
+                * @param path The path of the ORC file(s).
+                * @return The builder.
+                */
+               public Builder path(String path) {
+                       Preconditions.checkNotNull(path, "Path must not be 
null.");
+                       Preconditions.checkArgument(!path.isEmpty(), "Path must 
not be empty.");
+                       this.path = path;
+                       return this;
+               }
+
+               /**
+                * Sets the ORC schema of the files to read as a String.
+                *
+                * @param orcSchema The ORC schema of the files to read as a 
String.
+                * @return The builder.
+                */
+               public Builder forOrcSchema(String orcSchema) {
+                       Preconditions.checkNotNull(orcSchema, "ORC schema must 
not be null.");
+                       this.schema = TypeDescription.fromString(orcSchema);
+                       return this;
+               }
+
+               /**
+                * Sets the ORC schema of the files to read as a {@link 
TypeDescription}.
+                *
+                * @param orcSchema The ORC schema of the files to read as a 
String.
+                * @return The builder.
+                */
+               public Builder forOrcSchema(TypeDescription orcSchema) {
+                       Preconditions.checkNotNull(orcSchema, "ORC Schema must 
not be null.");
+                       this.schema = orcSchema;
+                       return this;
+               }
+
+               /**
+                * Sets a Hadoop {@link Configuration} for the ORC reader. If 
no configuration is configured,
+                * an empty configuration is used.
+                *
+                * @param config The Hadoop Configuration for the ORC reader.
+                * @return The builder.
+                */
+               public Builder withConfiguration(Configuration config) {
+                       Preconditions.checkNotNull(config, "Configuration must 
not be null.");
+                       this.config = config;
+                       return this;
+               }
+
+               /**
+                * Sets the number of rows that are read in a batch. If not 
configured, the ORC files are
+                * read with a batch size of 1000.
+                *
+                * @param batchSize The number of rows that are read in a batch.
+                * @return The builder.
+                */
+               public Builder withBatchSize(int batchSize) {
+                       Preconditions.checkArgument(batchSize > 0, "Batch size 
must be greater than zero.");
+                       this.batchSize = batchSize;
+                       return this;
+               }
+
+               /**
+                * Builds the OrcTableSource for this builder.
+                *
+                * @return The OrcTableSource for this builder.
+                */
+               public OrcTableSource build() {
+                       Preconditions.checkNotNull(this.path, "Path must not be 
null.");
+                       Preconditions.checkNotNull(this.schema, "ORC schema 
must not be null.");
+                       if (this.config == null) {
+                               this.config = new Configuration();
+                       }
+                       if (this.batchSize == 0) {
+                               // set default batch size
+                               this.batchSize = DEFAULT_BATCH_SIZE;
+                       }
+                       return new OrcTableSource(this.path, this.schema, 
this.config, this.batchSize);
+               }
+
        }
 
 }

Reply via email to