[FLINK-7548] [table] Refactor TableSource interface.

- Improves the time attribute handling of table sources.

This commit also resolves:
- FLINK-6870: Unified handling of time attributes in batch and streaming
- FLINK-7179: Support for projection push down and watermark assigners
- FLINK-7696: Support for projection push down and time attributes

This closes #4894.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a2ba6e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a2ba6e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a2ba6e0

Branch: refs/heads/master
Commit: 9a2ba6e058e907aef65e0af8731ca5ec8733712e
Parents: 760f0bc
Author: Fabian Hueske <[email protected]>
Authored: Thu Oct 5 17:44:35 2017 +0200
Committer: Fabian Hueske <[email protected]>
Committed: Mon Oct 30 14:09:27 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   | 114 ++--
 .../connectors/kafka/KafkaTableSource.java      |   6 +
 .../flink/addons/hbase/HBaseTableSource.java    |  50 +-
 .../flink/table/api/BatchTableEnvironment.scala |   4 +-
 .../table/api/StreamTableEnvironment.scala      |  34 +-
 .../flink/table/api/TableEnvironment.scala      |  25 +-
 .../apache/flink/table/api/TableSchema.scala    |  48 +-
 .../table/catalog/ExternalTableSourceUtil.scala |  16 +-
 .../flink/table/codegen/CodeGenerator.scala     |  56 +-
 .../flink/table/plan/nodes/CommonScan.scala     |  50 +-
 .../plan/nodes/PhysicalTableSourceScan.scala    |  18 +-
 .../table/plan/nodes/dataset/BatchScan.scala    |  68 ++-
 .../nodes/dataset/BatchTableSourceScan.scala    |  65 ++-
 .../table/plan/nodes/dataset/DataSetScan.scala  |   8 +-
 .../plan/nodes/datastream/DataStreamScan.scala  |   3 +-
 .../plan/nodes/datastream/StreamScan.scala      | 112 ++--
 .../datastream/StreamTableSourceScan.scala      | 131 ++++-
 .../logical/FlinkLogicalTableSourceScan.scala   |  37 +-
 .../dataSet/BatchTableSourceScanRule.scala      |   3 +-
 .../datastream/StreamTableSourceScanRule.scala  |   3 +-
 .../PushFilterIntoTableSourceScanRule.scala     |   2 +-
 .../PushProjectIntoTableSourceScanRule.scala    |  46 +-
 .../plan/schema/BatchTableSourceTable.scala     |  43 ++
 .../flink/table/plan/schema/DataSetTable.scala  |   3 +-
 .../table/plan/schema/DataStreamTable.scala     |   4 +-
 .../flink/table/plan/schema/FlinkTable.scala    |  93 ---
 .../flink/table/plan/schema/InlineTable.scala   | 115 ++++
 .../plan/schema/StreamTableSourceTable.scala    | 135 +----
 .../table/plan/schema/TableSourceTable.scala    |  30 +-
 .../flink/table/plan/stats/FlinkStatistic.scala |   4 +-
 .../flink/table/sources/CsvTableSource.scala    |  96 +++-
 .../table/sources/DefinedFieldMapping.scala     |  55 ++
 .../flink/table/sources/DefinedFieldNames.scala |  35 --
 .../flink/table/sources/FieldComputer.scala     |  63 ++
 .../NestedFieldsProjectableTableSource.scala    |  40 +-
 .../table/sources/ProjectableTableSource.scala  |  21 +-
 .../flink/table/sources/TableSource.scala       |  40 +-
 .../flink/table/sources/TableSourceUtil.scala   | 522 +++++++++++++++++
 .../table/sources/definedTimeAttributes.scala   |  77 +--
 .../table/sources/timestampExtractors.scala     |  77 +++
 .../table/sources/watermarkStrategies.scala     | 101 ++++
 .../table/typeutils/TimeIndicatorTypeInfo.scala |   7 +-
 .../flink/table/api/ExternalCatalogTest.scala   |  10 +-
 .../flink/table/api/TableSourceTest.scala       |  36 +-
 .../api/stream/table/TableSourceTest.scala      | 331 +++++++----
 .../validation/TableSourceValidationTest.scala  |  59 --
 .../validation/FlinkTableValidationTest.scala   |  39 --
 .../validation/InlineTableValidationTest.scala  |  39 ++
 .../validation/TableSourceValidationTest.scala  | 244 +++++---
 .../catalog/ExternalTableSourceUtilTest.scala   |   5 +-
 .../catalog/InMemoryExternalCatalogTest.scala   |  11 +-
 .../runtime/batch/table/TableSourceITCase.scala | 571 ++++++++++++++++++-
 .../runtime/stream/TimeAttributesITCase.scala   |  17 +-
 .../stream/table/TableSourceITCase.scala        | 523 ++++++++++++++++-
 .../table/runtime/utils/CommonTestData.scala    |   2 +
 .../table/utils/TestFilterableTableSource.scala | 135 -----
 .../table/utils/TestTableSourceWithTime.scala   |  52 --
 .../flink/table/utils/testTableSources.scala    | 304 ++++++++++
 58 files changed, 3637 insertions(+), 1201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index f77625b..d5bb874 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -298,17 +298,19 @@ Similar to using <code>JDBCOutputFormat</code>, you have 
to explicitly specify t
 Define a TableSource
 --------------------
 
-A `TableSource` is a generic interface to access to data stored in an external 
system as a table. It produces a `DataSet` or `DataStream` and provides the 
type information to derive the schema of the generated table. There are 
different table sources for batch tables and streaming tables.
+A `TableSource` is a generic interface that gives Table API and SQL queries 
access to data stored in an external system. It provides the schema of the 
table and the records that are mapped to rows with the table's schema. 
Depending on whether the `TableSource` is used in a streaming or batch query, 
the records are produced as a `DataSet` or `DataStream`. 
 
-Schema information consists of a data type, field names, and corresponding 
indexes of these names in the data type.
+If a `TableSource` is used in a streaming query it must implement the 
`StreamTableSource` interface, if it is used in a batch query it must implement 
the `BatchTableSource` interface. A `TableSource` can also implement both 
interfaces and be used in streaming and batch queries. 
 
-The general interface looks as follows:
+`StreamTableSource` and `BatchTableSource` extend the base interface 
`TableSource` that defines the following methods:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 TableSource<T> {
 
+  public TableSchema getTableSchema();
+
   public TypeInformation<T> getReturnType();
 
   public String explainSource();
@@ -320,6 +322,8 @@ TableSource<T> {
 {% highlight scala %}
 TableSource[T] {
 
+  def getTableSchema: TableSchema
+
   def getReturnType: TypeInformation[T]
 
   def explainSource: String
@@ -329,15 +333,19 @@ TableSource[T] {
 </div>
 </div>
 
-To define a `TableSource` one needs to implement `TableSource#getReturnType`. 
In this case field names and field indexes are derived from the returned type.
+* `getTableSchema()`: Returns the schema of the table, i.e., the names and 
types of the fields of the table. The field types are defined using Flink's 
`TypeInformation` (see [Table API types](tableApi.html#data-types) and [SQL 
types](sql.html#data-types)).
 
-If the `TypeInformation` returned by `getReturnType` does not allow to specify 
custom field names, it is possible to implement the `DefinedFieldNames` 
interface in addition.
+* `getReturnType()`: Returns the physical type of the `DataStream` 
(`StreamTableSource`) or `DataSet` (`BatchTableSource`) and the records that 
are produced by the `TableSource`.
 
-### BatchTableSource
+* `explainSource()`: Returns a String that describes the `TableSource`. This 
method is optional and used for display purposes only.
 
-Defines an external `TableSource` to create a batch table and provides access 
to its data.
+The `TableSource` interface separates the logical table schema from the 
physical type of the returned `DataStream` or `DataSet`. As a consequence, all 
fields of the table schema (`getTableSchema()`) must be mapped to a field with 
corresponding type of the physical return type (`getReturnType()`). By default, 
this mapping is done based on field names. For example, a `TableSource` that 
defines a table schema with two fields `[name: String, size: Integer]` requires 
a `TypeInformation` with at least two fields called `name` and `size` of type 
`String` and `Integer`, respectively. This could be a `PojoTypeInfo` or a 
`RowTypeInfo` that have two fields named `name` and `size` with matching types. 
 
-The interface looks as follows:
+However, some types, such as Tuple or CaseClass types, do support custom field 
names. If a `TableSource` returns a `DataStream` or `DataSet` of a type with 
fixed field names, it can implement the `DefinedFieldMapping` interface to map 
field names from the table schema to field names of the physical return type.
+
+### Defining a BatchTableSource
+
+The `BatchTableSource` interface extends the `TableSource` interface and 
defines one additional method:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -359,20 +367,20 @@ BatchTableSource[T] extends TableSource[T] {
 </div>
 </div>
 
-{% top %}
+* `getDataSet(execEnv)`: Returns a `DataSet` with the data of the table. The 
type of the `DataSet` must be identical to the return type defined by the 
`TableSource.getReturnType()` method. The `DataSet` can by created using a 
regular [data source]({{ site.baseurl }}/dev/batch/#data-sources) of the 
DataSet API. Commonly, a `BatchTableSource` is implemented by wrapping a 
`InputFormat` or [batch connector]({{ site.baseurl 
}}/dev/batch/connectors.html).
 
-### StreamTableSource
+{% top %}
 
-Defines an external `TableSource` to create a streaming table and provides 
access to its data.
+### Defining a StreamTableSource
 
-The interface looks as follows:
+The `StreamTableSource` interface extends the `TableSource` interface and 
defines one additional method: 
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 StreamTableSource<T> extends TableSource<T> {
 
-  public DataSet<T> getDataStream(StreamExecutionEnvironment execEnv);
+  public DataStream<T> getDataStream(StreamExecutionEnvironment execEnv);
 }
 {% endhighlight %}
 </div>
@@ -381,84 +389,85 @@ StreamTableSource<T> extends TableSource<T> {
 {% highlight scala %}
 StreamTableSource[T] extends TableSource[T] {
 
-  def getDataStream(execEnv: StreamExecutionEnvironment): DataSet[T]
+  def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
 }
 {% endhighlight %}
 </div>
 </div>
 
-**IMPORTANT:** Time-based operations on streaming tables such as windows 
require explicitly specified [time attributes]({{ site.baseurl 
}}/dev/table/streaming.html#time-attributes) (both for the [Table 
API](tableApi.html#group-windows) and [SQL](sql.html#group-windows)). A 
`StreamTableSource` defines 
+* `getDataStream(execEnv)`: Returns a `DataStream` with the data of the table. 
The type of the `DataStream` must be identical to the return type defined by 
the `TableSource.getReturnType()` method. The `DataStream` can by created using 
a regular [data source]({{ site.baseurl 
}}/dev/datastream_api.html#data-sources) of the DataStream API. Commonly, a 
`StreamTableSource` is implemented by wrapping a `SourceFunction` or a [stream 
connector]({{ site.baseurl }}/dev/connectors/).
+
+### Defining a TableSource with Time Attributes
 
-- an *event-time attribute* by implementing the `DefinedRowtimeAttribute` 
interface and
-- a *processing-time attribute* by implementing the `DefinedProctimeAttribute` 
interface.
+Time-based operations of streaming [Table API](tableApi.html#group-windows) 
and [SQL](sql.html#group-windows) queries, such as windowed aggregations or 
joins, require explicitly specified [time attributes]({{ site.baseurl 
}}/dev/table/streaming.html#time-attributes). 
 
-Both are described in the following sections.
+A `TableSource` defines a time attribute as a field of type 
`Types.SQL_TIMESTAMP` in its table schema. In contrast to all regular fields in 
the schema, a time attribute must not be matched to a physical field in the 
return type of the table source. Instead, a `TableSource` defines a time 
attribute by implementing a certain interface.
 
-#### DefinedRowtimeAttribute
+#### Defining a Processing Time Attribute
 
-The `DefinedRowtimeAttribute` interface provides a single method.
+A `TableSource` defines a [processing time 
attribute](streaming.html#processing-time) by implementing the 
`DefinedProctimeAttribute` interface. The interface looks as follows:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DefinedRowtimeAttribute {
+DefinedProctimeAttribute {
 
-  public String getRowtimeAttribute();
+  public String getProctimeAttribute();
 }
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-DefinedRowtimeAttribute {
+DefinedProctimeAttribute {
 
-  def getRowtimeAttribute(): String
+  def getProctimeAttribute: String
 }
 {% endhighlight %}
 </div>
 </div>
 
-The `getRowtimeAttribute()` method returns the name of the field that holds 
the event-time timestamps for the rows of the table. The field must exist in 
the schema of the `StreamTableSource` and be of type `LONG` or `TIMESTAMP`. 
Moreover, the `DataStream` returned by `StreamTableSource.getDataStream()` must 
have [watermarks]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) 
assigned which are aligned with the values of the specified timestamp field. 
-
-Please see the documentation on [timestamp and watermark assignment]({{ 
site.baseurl }}/dev/event_timestamps_watermarks.html) for details on how to 
assign watermarks. Please note that the timestamps of a `DataStream` (the ones 
which are assigned by a `TimestampAssigner`) are ignored. Only the values of 
the `TableSource`'s rowtime field are relevant.
+* `getProctimeAttribute()`: Returns the name of the processing time attribute. 
The specified attribute must be defined of type `Types.SQL_TIMESTAMP` in the 
table schema and can be used in time-based operations. A 
`DefinedProctimeAttribute` table source can define no processing time attribute 
by returning `null`.
 
-**Note:** A `TableSource` that returns a rowtime attribute does not support 
projection pushdown.
+**Note** Both `StreamTableSource` and `BatchTableSource` can implement 
`DefinedProctimeAttribute` and define a processing time attribute. In case of a 
`BatchTableSource` the processing time field is initialized with the current 
timestamp during the table scan.
 
-#### DefinedProctimeAttribute
+#### Defining a Rowtime Attribute
 
-The `DefinedProctimeAttribute` interface provides a single method.
+A `TableSource` defines a [rowtime attribute](streaming.html#event-time) by 
implementing the `DefinedRowtimeAttributes` interface. The interface looks as 
follows:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-DefinedProctimeAttribute {
+DefinedRowtimeAttribute {
 
-  public String getProctimeAttribute();
+  public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors();
 }
 {% endhighlight %}
 </div>
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-DefinedProctimeAttribute {
+DefinedRowtimeAttributes {
 
-  def getProctimeAttribute(): String
+  def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]
 }
 {% endhighlight %}
 </div>
 </div>
 
-The `getProctimeAttribute()` method returns the name of a field that is 
appended to each row returned by the `StreamTableSource`. The appended field 
serves as a processing time timestamp and can be used in time-based operations.
+* `getRowtimeAttributeDescriptors()`: Returns a list of 
`RowtimeAttributeDescriptor`. A `RowtimeAttributeDescriptor` describes a 
rowtime attribute with the following properties:
+  * `attributeName`: The name of the rowtime attribute in the table schema. 
The field must be defined with type `Types.SQL_TIMESTAMP`.
+  * `timestampExtractor`: The timestamp extractor extracts the timestamp from 
a record with the return type. For example, it can convert convert a Long field 
into a timestamp or parse a String-encoded timestamp. Flink comes with a set of 
built-in `TimestampExtractor` implementation for common use cases. It is also 
possible to provide a custom implementation.
+  * `watermarkStrategy`: The watermark strategy defines how watermarks are 
generated for the rowtime attribute. Flink comes with a set of built-in 
`WatermarkStrategy` implementations for common use cases. It is also possible 
to provide a custom implementation.
+* **Note** Although the `getRowtimeAttributeDescriptors()` method returns a 
list of descriptors, only a single rowtime attribute is support at the moment. 
We plan to remove this restriction in the future and support tables with more 
than one rowtime attribute.
 
-**Note:** A `TableSource` that returns a processing time attribute does not 
support projection pushdown.
+**IMPORTANT** Both, `StreamTableSource` and `BatchTableSource`, can implement 
`DefinedRowtimeAttributes` and define a rowtime attribute. In either case, the 
rowtime field is extracted using the `TimestampExtractor`. Hence, a 
`TableSource` that implements `StreamTableSource` and `BatchTableSource` and 
defines a rowtime attribute provides exactly the same data to streaming and 
batch queries.
 
 {% top %}
 
-### ProjectableTableSource
+### Defining a TableSource with Projection Push-Down
 
-The `ProjectableTableSource` interface adds support for projection push-down 
to a `TableSource`. A `TableSource` extending this interface is able to project 
the fields of the return table.
-
-The interface looks as follows:
+A `TableSource` supports projection push-down by implementing the 
`ProjectableTableSource` interface. The interface defines a single method:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -474,21 +483,15 @@ ProjectableTableSource<T> {
 {% highlight scala %}
 ProjectableTableSource[T] {
 
-  def TableSource[T] projectFields(fields: Array[Int])
+  def projectFields(fields: Array[Int]): TableSource[T]
 }
 {% endhighlight %}
 </div>
 </div>
 
-The `projectFields()` is called with an array that holds the indexes of the 
required fields. The method returns a new `TableSource` object that returns 
rows with the requested schema.
-
-{% top %}
+* `projectFields(fields)`: Returns a *copy* of the `TableSource` with adjusted 
physical return type. The `fields` parameter provides the indexes of the fields 
that must be provided by the `TableSource`. The indexes relate to the 
`TypeInformation` of the physical return type, *not* to the logical table 
schema. The copied `TableSource` must adjust its return type and the returned 
`DataStream` or `DataSet`. The `TableSchema` of the copied `TableSource` must 
not be changed, i.e, it must be the same as the original `TableSource`. If the 
`TableSource` implements the `DefinedFieldMapping` interface, the field mapping 
must be adjusted to the new return type.
 
-### NestedFieldsProjectableTableSource
-
-The `NestedFieldsProjectableTableSource` interface adds support for projection 
push-down to a `TableSource` with nested fields. A `TableSource` extending this 
interface is able to project the nested fields of the returned table.
-
-The interface looks as follows:
+The `ProjectableTableSource` adds support to project flat fields. If the 
`TableSource` defines a table with nested schema, it can implement the 
`NestedFieldsProjectableTableSource` to extend the projection to nested fields. 
The `NestedFieldsProjectableTableSource` is defined as follows:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -510,9 +513,13 @@ NestedFieldsProjectableTableSource[T] {
 </div>
 </div>
 
-### FilterableTableSource
+* `projectNestedField(fields, nestedFields)`: Returns a *copy* of the 
`TableSource` with adjusted physical return type. Fields of the physical return 
type may be removed or reordered but their type must not be changed. The 
contract of this method is essentially the same as for the 
`ProjectableTableSource.projectFields()` method. In addition, the 
`nestedFields` parameter contains for each field index in the `fields` list, a 
list of paths to all nested fields that are accessed by the query. All other 
nested fields do not need to be read, parsed, and set in the records that are 
produced by the `TableSource`. **IMPORTANT** the types of the projected fields 
must not be changed but unused fields may be set to null or to a default value.
 
-The `FilterableTableSource` interface adds support for filtering push-down to 
a `TableSource`. A `TableSource` extending this interface is able to filter 
records before returning.
+{% top %}
+
+### Defining a TableSource with Filter Push-Down
+
+The `FilterableTableSource` interface adds support for filter push-down to a 
`TableSource`. A `TableSource` extending this interface is able to filter 
records such that the returned `DataStream` or `DataSet` returns fewer records.
 
 The interface looks as follows:
 
@@ -540,9 +547,8 @@ FilterableTableSource[T] {
 </div>
 </div>
 
-The optimizer pushes predicates down by calling the `applyPredicate()` method. 
The `TableSource` can evaluate which predicates to evaluate by itself and which 
to leave for the framework. Predicates which are evaluated by the `TableSource` 
must be removed from the `List`. All predicates which remain in the `List` 
after the method call returns are evaluated by the framework. The 
`applyPredicate()` method returns a new `TableSource` that evaluates all 
selected predicates.
-
-The `isFilterPushedDown()` method tells the optimizer whether predicates have 
been pushed down or not.
+* `applyPredicate(predicates)`: Returns a *copy* of the `TableSource` with 
added predicates. The `predicates` parameter is a mutable list of conjunctive 
predicates that are "offered" to the `TableSource`. The `TableSource` accepts 
to evaluate a predicate by removing it from the list. Predicates that are left 
in the list will be evaluated by a subsequent filter operator. 
+* `isFilterPushedDown()`: Returns true if the `applyPredicate()` method was 
called before. Hence, `isFilterPushedDown()` must return true for all 
`TableSource` instances returned from a `applyPredicate()` call.
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index 8969f90..a9cf235 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
@@ -84,6 +85,11 @@ public abstract class KafkaTableSource implements 
StreamTableSource<Row> {
                return typeInfo;
        }
 
+       @Override
+       public TableSchema getTableSchema() {
+               return TableSchema.fromTypeInfo(typeInfo);
+       }
+
        /**
         * Returns the version-specific Kafka consumer.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 
b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
index cc7e602..eb0fc9b 100644
--- 
a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
+++ 
b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.sources.BatchTableSource;
 import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.types.Row;
@@ -58,7 +59,8 @@ public class HBaseTableSource implements 
BatchTableSource<Row>, ProjectableTable
 
        private Configuration conf;
        private String tableName;
-       private HBaseTableSchema schema;
+       private HBaseTableSchema hBaseSchema;
+       private TableSchema tableSchema;
 
        /**
         * The HBase configuration and the name of the table to read.
@@ -69,7 +71,14 @@ public class HBaseTableSource implements 
BatchTableSource<Row>, ProjectableTable
        public HBaseTableSource(Configuration conf, String tableName) {
                this.conf = conf;
                this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
-               this.schema = new HBaseTableSchema();
+               this.hBaseSchema = new HBaseTableSchema();
+       }
+
+       private HBaseTableSource(Configuration conf, String tableName, 
TableSchema tableSchema) {
+               this.conf = conf;
+               this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
+               this.hBaseSchema = new HBaseTableSchema();
+               this.tableSchema = tableSchema;
        }
 
        /**
@@ -80,7 +89,7 @@ public class HBaseTableSource implements 
BatchTableSource<Row>, ProjectableTable
         * @param clazz     the data type of the qualifier
         */
        public void addColumn(String family, String qualifier, Class<?> clazz) {
-               this.schema.addColumn(family, qualifier, clazz);
+               this.hBaseSchema.addColumn(family, qualifier, clazz);
        }
 
        /**
@@ -89,34 +98,51 @@ public class HBaseTableSource implements 
BatchTableSource<Row>, ProjectableTable
         * @param charset Name of the charset to use.
         */
        public void setCharset(String charset) {
-               this.schema.setCharset(charset);
+               this.hBaseSchema.setCharset(charset);
        }
 
        @Override
        public TypeInformation<Row> getReturnType() {
-               String[] famNames = schema.getFamilyNames();
-               TypeInformation<?>[] typeInfos = new 
TypeInformation[famNames.length];
+               return new RowTypeInfo(getFieldTypes(), getFieldNames());
+       }
+
+       @Override
+       public TableSchema getTableSchema() {
+               if (this.tableSchema == null) {
+                       return new TableSchema(getFieldNames(), 
getFieldTypes());
+               } else {
+                       return this.tableSchema;
+               }
+       }
+
+       private String[] getFieldNames() {
+               return hBaseSchema.getFamilyNames();
+       }
+
+       private TypeInformation[] getFieldTypes() {
+               String[] famNames = hBaseSchema.getFamilyNames();
+               TypeInformation<?>[] fieldTypes = new 
TypeInformation[hBaseSchema.getFamilyNames().length];
                int i = 0;
                for (String family : famNames) {
-                       typeInfos[i] = new 
RowTypeInfo(schema.getQualifierTypes(family), schema.getQualifierNames(family));
+                       fieldTypes[i] = new 
RowTypeInfo(hBaseSchema.getQualifierTypes(family), 
hBaseSchema.getQualifierNames(family));
                        i++;
                }
-               return new RowTypeInfo(typeInfos, famNames);
+               return fieldTypes;
        }
 
        @Override
        public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
-               return execEnv.createInput(new HBaseRowInputFormat(conf, 
tableName, schema), getReturnType());
+               return execEnv.createInput(new HBaseRowInputFormat(conf, 
tableName, hBaseSchema), getReturnType());
        }
 
        @Override
        public HBaseTableSource projectFields(int[] fields) {
-               String[] famNames = schema.getFamilyNames();
-               HBaseTableSource newTableSource = new 
HBaseTableSource(this.conf, tableName);
+               String[] famNames = hBaseSchema.getFamilyNames();
+               HBaseTableSource newTableSource = new 
HBaseTableSource(this.conf, tableName, getTableSchema().copy());
                // Extract the family from the given fields
                for (int field : fields) {
                        String family = famNames[field];
-                       Map<String, TypeInformation<?>> familyInfo = 
schema.getFamilyInfo(family);
+                       Map<String, TypeInformation<?>> familyInfo = 
hBaseSchema.getFamilyInfo(family);
                        for (String qualifier : familyInfo.keySet()) {
                                // create the newSchema
                                newTableSource.addColumn(family, qualifier, 
familyInfo.get(qualifier).getTypeClass());

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index bca5826..8a78ac4 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -36,7 +36,7 @@ import org.apache.flink.table.expressions.{Expression, 
TimeAttribute}
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.dataset.DataSetRel
 import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, 
TableSinkTable, TableSourceTable}
+import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
DataSetTable, RowSchema, TableSinkTable}
 import org.apache.flink.table.runtime.MapRunner
 import org.apache.flink.table.sinks.{BatchTableSink, TableSink}
 import org.apache.flink.table.sources.{BatchTableSource, TableSource}
@@ -98,7 +98,7 @@ abstract class BatchTableEnvironment(
 
     tableSource match {
       case batchTableSource: BatchTableSource[_] =>
-        registerTableInternal(name, new TableSourceTable(batchTableSource))
+        registerTableInternal(name, new 
BatchTableSourceTable(batchTableSource))
       case _ =>
         throw new TableException("Only BatchTableSource can be registered in " 
+
             "BatchTableEnvironment")

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index e42beae..e80acca 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -48,7 +48,7 @@ import org.apache.flink.table.plan.schema.{DataStreamTable, 
RowSchema, StreamTab
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
 import org.apache.flink.table.runtime.{CRowMapRunner, 
OutputRowtimeProcessFunction}
 import org.apache.flink.table.sinks._
-import org.apache.flink.table.sources.{DefinedRowtimeAttribute, 
StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{StreamTableSource, TableSource, 
TableSourceUtil}
 import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
 
 import _root_.scala.collection.JavaConverters._
@@ -109,23 +109,19 @@ abstract class StreamTableEnvironment(
   override def registerTableSource(name: String, tableSource: TableSource[_]): 
Unit = {
     checkValidTableName(name)
 
-    // check if event-time is enabled
-    tableSource match {
-      case dra: DefinedRowtimeAttribute if dra.getRowtimeAttribute != null &&
-          execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime 
=>
-
-        throw TableException(
-          s"A rowtime attribute requires an EventTime time characteristic in 
stream environment. " +
-            s"But is: ${execEnv.getStreamTimeCharacteristic}")
-      case _ => // ok
-    }
-
     tableSource match {
       case streamTableSource: StreamTableSource[_] =>
+        // check that event-time is enabled if table source includes rowtime 
attributes
+        if (TableSourceUtil.hasRowtimeAttribute(streamTableSource) &&
+          execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) 
{
+            throw TableException(
+              s"A rowtime attribute requires an EventTime time characteristic 
in stream " +
+                s"environment. But is: ${execEnv.getStreamTimeCharacteristic}")
+        }
         registerTableInternal(name, new 
StreamTableSourceTable(streamTableSource))
       case _ =>
         throw new TableException("Only StreamTableSource can be registered in 
" +
-            "StreamTableEnvironment")
+          "StreamTableEnvironment")
     }
   }
 
@@ -554,14 +550,18 @@ abstract class StreamTableEnvironment(
 
     // inject rowtime field
     val withRowtime = rowtime match {
-      case Some(rt) => fieldIndexes.patch(rt._1, 
Seq(TimeIndicatorTypeInfo.ROWTIME_MARKER), 0)
-      case _ => fieldIndexes
+      case Some(rt) =>
+        fieldIndexes.patch(rt._1, 
Seq(TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER), 0)
+      case _ =>
+        fieldIndexes
     }
 
     // inject proctime field
     val withProctime = proctime match {
-      case Some(pt) => withRowtime.patch(pt._1, 
Seq(TimeIndicatorTypeInfo.PROCTIME_MARKER), 0)
-      case _ => withRowtime
+      case Some(pt) =>
+        withRowtime.patch(pt._1, 
Seq(TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER), 0)
+      case _ =>
+        withRowtime
     }
 
     withProctime

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 7e360f9..fec7f1a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -57,7 +57,7 @@ import org.apache.flink.table.plan.logical.{CatalogNode, 
LogicalRelNode}
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSinkTable}
 import org.apache.flink.table.sinks.TableSink
-import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
+import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.validate.FunctionCatalog
 import org.apache.flink.types.Row
@@ -1171,27 +1171,4 @@ object TableEnvironment {
     }
   }
 
-  /**
-    * Returns field names for a given [[TableSource]].
-    *
-    * @param tableSource The TableSource to extract field names from.
-    * @tparam A The type of the TableSource.
-    * @return An array holding the field names.
-    */
-  def getFieldNames[A](tableSource: TableSource[A]): Array[String] = 
tableSource match {
-      case d: DefinedFieldNames => d.getFieldNames
-      case _ => TableEnvironment.getFieldNames(tableSource.getReturnType)
-    }
-
-  /**
-    * Returns field indices for a given [[TableSource]].
-    *
-    * @param tableSource The TableSource to extract field indices from.
-    * @tparam A The type of the TableSource.
-    * @return An array holding the field indices.
-    */
-  def getFieldIndices[A](tableSource: TableSource[A]): Array[Int] = 
tableSource match {
-    case d: DefinedFieldNames => d.getFieldIndices
-    case _ => TableEnvironment.getFieldIndices(tableSource.getReturnType)
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
index 6ee65f0..f7ef710 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
@@ -18,6 +18,7 @@
 package org.apache.flink.table.api
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
 
 /**
   * A TableSchema represents a Table's structure.
@@ -51,6 +52,13 @@ class TableSchema(
   val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap
 
   /**
+    * Returns a deep copy of the TableSchema.
+    */
+  def copy: TableSchema = {
+    new TableSchema(columnNames.clone(), columnTypes.clone())
+  }
+
+  /**
     * Returns all type information as an array.
     */
   def getTypes: Array[TypeInformation[_]] = columnTypes
@@ -99,7 +107,7 @@ class TableSchema(
     }
   }
 
-  override def toString = {
+  override def toString: String = {
     val builder = new StringBuilder
     builder.append("root\n")
     columnNames.zip(columnTypes).foreach{ case (name, typeInfo) =>
@@ -108,4 +116,42 @@ class TableSchema(
     builder.toString()
   }
 
+  override def equals(other: Any): Boolean = {
+    other match {
+      case that: TableSchema if that.canEqual(this) =>
+        this.columnNames.deep == that.columnNames.deep &&
+        this.columnTypes.deep == that.columnTypes.deep
+      case _ => false
+    }
+  }
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[TableSchema]
+
+}
+
+object TableSchema {
+
+  /**
+    * Creates a [[TableSchema]] from a [[TypeInformation]].
+    * If the [[TypeInformation]] is a [[CompositeType]], the fieldnames and 
types for the composite
+    * type are used to construct the [[TableSchema]].
+    * Otherwise, a [[TableSchema]] with a single field is created. The field 
name is "f0" and the
+    * field type the provided type.
+    *
+    * @param typeInfo The [[TypeInformation]] from which the [[TableSchema]] 
is generated.
+    * @return The [[TableSchema]] that was generated from the given 
[[TypeInformation]].
+    */
+  def fromTypeInfo(typeInfo: TypeInformation[_]): TableSchema = {
+    typeInfo match {
+      case c: CompositeType[_] =>
+        // get field names and types from composite type
+        val fieldNames = c.getFieldNames
+        val fieldTypes = 
fieldNames.map(c.getTypeAt).asInstanceOf[Array[TypeInformation[_]]]
+        new TableSchema(fieldNames, fieldTypes)
+      case t: TypeInformation[_] =>
+        // create table schema with a single field named "f0" of the given 
type.
+        new TableSchema(Array("f0"), Array(t))
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
index 6bacac1..d2e297f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
@@ -23,10 +23,10 @@ import java.net.URL
 import org.apache.commons.configuration.{ConfigurationException, 
ConversionException, PropertiesConfiguration}
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.table.annotation.TableType
-import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, 
NoMatchedTableSourceConverterException}
-import org.apache.flink.table.plan.schema.{StreamTableSourceTable, 
TableSourceTable}
+import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, 
NoMatchedTableSourceConverterException, TableException}
+import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
 import org.apache.flink.table.plan.stats.FlinkStatistic
-import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, 
TableSource}
 import org.apache.flink.table.util.Logging
 import org.apache.flink.util.InstantiationUtil
 import org.reflections.Reflections
@@ -124,9 +124,15 @@ object ExternalTableSourceUtil extends Logging {
           } else {
             FlinkStatistic.UNKNOWN
           }
+
           convertedTableSource match {
-            case s : StreamTableSource[_] => new StreamTableSourceTable(s, 
flinkStatistic)
-            case _ => new TableSourceTable(convertedTableSource, 
flinkStatistic)
+            case s: StreamTableSource[_] =>
+              new StreamTableSourceTable(s, flinkStatistic)
+            case b: BatchTableSource[_] =>
+              new BatchTableSourceTable(b, flinkStatistic)
+            case _ =>
+              throw new TableException("Unknown TableSource type.")
+
           }
         }
       case None =>

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 5e7ec32..be7f84f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -37,11 +37,10 @@ import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
-import org.apache.flink.table.codegen.calls.FunctionGenerator
+import org.apache.flink.table.codegen.calls.{CurrentTimePointCallGen, 
FunctionGenerator}
 import org.apache.flink.table.codegen.calls.ScalarOperators._
 import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, 
ScalarSqlFunctions}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
-
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
 import org.apache.flink.table.typeutils.TypeCheckUtils._
@@ -242,44 +241,39 @@ abstract class CodeGenerator(
     *
     * @param returnType conversion target type. Inputs and output must have 
the same arity.
     * @param resultFieldNames result field names necessary for a mapping to 
POJO fields.
+    * @param rowtimeExpression an optional expression to extract the value of 
a rowtime field from
+    *                          the input data. If not set, the value of 
rowtime field is set to the
+    *                          StreamRecord timestamp.
     * @return instance of GeneratedExpression
     */
   def generateConverterResultExpression(
       returnType: TypeInformation[_ <: Any],
-      resultFieldNames: Seq[String])
+      resultFieldNames: Seq[String],
+      rowtimeExpression: Option[RexNode] = None)
     : GeneratedExpression = {
 
     val input1AccessExprs = input1Mapping.map {
-      case TimeIndicatorTypeInfo.ROWTIME_MARKER =>
-        // attribute is a rowtime indicator. Access event-time timestamp in 
StreamRecord.
-        generateRowtimeAccess()
-      case TimeIndicatorTypeInfo.PROCTIME_MARKER =>
+      case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER |
+           TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER =>
+        // attribute is a rowtime indicator.
+        rowtimeExpression match {
+          case Some(expr) =>
+            // generate rowtime attribute from expression
+            generateExpression(expr)
+          case _ =>
+            // fetch rowtime attribute from StreamRecord timestamp field
+            generateRowtimeAccess()
+        }
+      case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER =>
         // attribute is proctime indicator.
         // we use a null literal and generate a timestamp when we need it.
         generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR)
+      case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER =>
+        // attribute is proctime field in a batch query.
+        // it is initialized with the current time.
+        generateCurrentTimestamp()
       case idx =>
-        // get type of result field
-        val outIdx = input1Mapping.indexOf(idx)
-        val outType = returnType match {
-          case pt: PojoTypeInfo[_] => pt.getTypeAt(resultFieldNames(outIdx))
-          case ct: CompositeType[_] => ct.getTypeAt(outIdx)
-          case t: TypeInformation[_] => t
-        }
-        val inputAccess = generateInputAccess(input1, input1Term, idx)
-        // Change output type to rowtime indicator
-        if (FlinkTypeFactory.isRowtimeIndicatorType(outType) &&
-          (inputAccess.resultType == Types.LONG || inputAccess.resultType == 
Types.SQL_TIMESTAMP)) {
-          // This case is required for TableSources that implement 
DefinedRowtimeAttribute.
-          // Hard cast possible because LONG, TIMESTAMP, and ROWTIME_INDICATOR 
are internally
-          // represented as Long.
-          GeneratedExpression(
-            inputAccess.resultTerm,
-            inputAccess.nullTerm,
-            inputAccess.code,
-            TimeIndicatorTypeInfo.ROWTIME_INDICATOR)
-        } else {
-          inputAccess
-        }
+        generateInputAccess(input1, input1Term, idx)
     }
 
     val input2AccessExprs = input2 match {
@@ -1332,6 +1326,10 @@ abstract class CodeGenerator(
     GeneratedExpression(resultTerm, NEVER_NULL, resultCode, 
SqlTimeTypeInfo.TIMESTAMP)
   }
 
+  private[flink] def generateCurrentTimestamp(): GeneratedExpression = {
+    new CurrentTimePointCallGen(Types.SQL_TIMESTAMP, false).generate(this, 
Seq())
+  }
+
   // 
----------------------------------------------------------------------------------------------
   // Reusable code snippets
   // 
----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
index 996d62c..252b748 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala
@@ -18,55 +18,7 @@
 
 package org.apache.flink.table.plan.nodes
 
-import org.apache.flink.api.common.functions.Function
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
-import org.apache.flink.types.Row
-
 /**
   * Common class for batch and stream scans.
   */
-trait CommonScan[T] {
-
-  /**
-    * We check if the input type is exactly the same as the internal row type.
-    * A conversion is necessary if types differ.
-    */
-  private[flink] def needsConversion(
-      externalTypeInfo: TypeInformation[Any],
-      internalTypeInfo: TypeInformation[T]): Boolean =
-    externalTypeInfo != internalTypeInfo
-
-  private[flink] def generatedConversionFunction[F <: Function](
-      config: TableConfig,
-      functionClass: Class[F],
-      inputType: TypeInformation[Any],
-      expectedType: TypeInformation[Row],
-      conversionOperatorName: String,
-      fieldNames: Seq[String],
-      inputFieldMapping: Option[Array[Int]] = None)
-    : GeneratedFunction[F, Row] = {
-
-    val generator = new FunctionCodeGenerator(
-      config,
-      false,
-      inputType,
-      None,
-      inputFieldMapping)
-    val conversion = generator.generateConverterResultExpression(expectedType, 
fieldNames)
-
-    val body =
-      s"""
-         |${conversion.code}
-         |return ${conversion.resultTerm};
-         |""".stripMargin
-
-    generator.generateFunction(
-      conversionOperatorName,
-      functionClass,
-      body,
-      expectedType)
-  }
-
-}
+trait CommonScan[T]

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
index 5872d8c..c9b4243 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
@@ -22,9 +22,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptTable, 
RelTraitSet}
 import org.apache.calcite.rel.RelWriter
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.TableScan
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
+import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
 
 import scala.collection.JavaConverters._
 
@@ -32,14 +33,19 @@ abstract class PhysicalTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    val tableSource: TableSource[_])
+    val tableSource: TableSource[_],
+    val selectedFields: Option[Array[Int]])
   extends TableScan(cluster, traitSet, table) {
 
   override def deriveRowType(): RelDataType = {
     val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildLogicalRowType(
-      TableEnvironment.getFieldNames(tableSource),
-      TableEnvironment.getFieldTypes(tableSource.getReturnType))
+    val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match {
+      case _: StreamTableSourceTable[_] => true
+      case _: BatchTableSourceTable[_] => false
+      case t => throw TableException(s"Unknown Table type ${t.getClass}.")
+    }
+
+    TableSourceUtil.getRelDataType(tableSource, selectedFields, 
streamingTable, flinkTypeFactory)
   }
 
   override def explainTerms(pw: RelWriter): RelWriter = {

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
index 95707b8..85152d8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
@@ -18,48 +18,52 @@
 
 package org.apache.flink.table.plan.nodes.dataset
 
+import org.apache.calcite.rex.RexNode
 import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonScan
-import org.apache.flink.table.plan.schema.FlinkTable
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.MapRunner
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.types.Row
 
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
 trait BatchScan extends CommonScan[Row] with DataSetRel {
 
   protected def convertToInternalRow(
+      schema: RowSchema,
       input: DataSet[Any],
-      flinkTable: FlinkTable[_],
-      config: TableConfig)
-    : DataSet[Row] = {
+      fieldIdxs: Array[Int],
+      config: TableConfig,
+      rowtimeExpression: Option[RexNode]): DataSet[Row] = {
 
     val inputType = input.getType
+    val internalType = schema.typeInfo
 
-    val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
+    val hasTimeIndicator = fieldIdxs.exists(f =>
+      f == TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER ||
+      f == TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER)
 
     // conversion
-    if (needsConversion(inputType, internalType)) {
+    if (inputType != internalType || hasTimeIndicator) {
 
-      val function = generatedConversionFunction(
+      val function = generateConversionMapper(
         config,
-        classOf[MapFunction[Any, Row]],
         inputType,
         internalType,
         "DataSetSourceConversion",
-        getRowType.getFieldNames,
-        Some(flinkTable.fieldIndexes))
+        schema.fieldNames,
+        fieldIdxs,
+        rowtimeExpression)
 
       val runner = new MapRunner[Any, Row](
         function.name,
         function.code,
         function.returnType)
 
-      val opName = s"from: 
(${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+      val opName = s"from: (${schema.fieldNames.mkString(", ")})"
 
       input.map(runner).name(opName)
     }
@@ -68,4 +72,38 @@ trait BatchScan extends CommonScan[Row] with DataSetRel {
       input.asInstanceOf[DataSet[Row]]
     }
   }
+
+  private def generateConversionMapper(
+      config: TableConfig,
+      inputType: TypeInformation[Any],
+      outputType: TypeInformation[Row],
+      conversionOperatorName: String,
+      fieldNames: Seq[String],
+      inputFieldMapping: Array[Int],
+      rowtimeExpression: Option[RexNode]): GeneratedFunction[MapFunction[Any, 
Row], Row] = {
+
+    val generator = new FunctionCodeGenerator(
+      config,
+      false,
+      inputType,
+      None,
+      Some(inputFieldMapping))
+
+    val conversion = generator.generateConverterResultExpression(
+      outputType,
+      fieldNames,
+      rowtimeExpression)
+
+    val body =
+      s"""
+         |${conversion.code}
+         |return ${conversion.resultTerm};
+         |""".stripMargin
+
+    generator.generateFunction(
+      "DataSetSourceConversion",
+      classOf[MapFunction[Any, Row]],
+      body,
+      outputType)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
index 74aac43..d09d69c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
@@ -20,13 +20,15 @@ package org.apache.flink.table.plan.nodes.dataset
 
 import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment}
+import org.apache.flink.table.api.{BatchTableEnvironment, TableException, 
Types}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
-import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.{BatchTableSource, TableSource}
+import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.sources._
 import org.apache.flink.types.Row
 
 /** Flink RelNode to read data from an external source defined by a 
[[BatchTableSource]]. */
@@ -34,15 +36,18 @@ class BatchTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    tableSource: BatchTableSource[_])
-  extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource)
+    tableSource: BatchTableSource[_],
+    selectedFields: Option[Array[Int]])
+  extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource, 
selectedFields)
   with BatchScan {
 
-  override def deriveRowType() = {
+  override def deriveRowType(): RelDataType = {
     val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    flinkTypeFactory.buildLogicalRowType(
-      TableEnvironment.getFieldNames(tableSource),
-      TableEnvironment.getFieldTypes(tableSource.getReturnType))
+    TableSourceUtil.getRelDataType(
+      tableSource,
+      selectedFields,
+      streaming = false,
+      flinkTypeFactory)
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
@@ -55,26 +60,58 @@ class BatchTableSourceScan(
       cluster,
       traitSet,
       getTable,
-      tableSource
+      tableSource,
+      selectedFields
     )
   }
 
   override def copy(
       traitSet: RelTraitSet,
-      newTableSource: TableSource[_])
-    : PhysicalTableSourceScan = {
+      newTableSource: TableSource[_]): PhysicalTableSourceScan = {
 
     new BatchTableSourceScan(
       cluster,
       traitSet,
       getTable,
-      newTableSource.asInstanceOf[BatchTableSource[_]]
+      newTableSource.asInstanceOf[BatchTableSource[_]],
+      selectedFields
     )
   }
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] 
= {
+
+    val fieldIndexes = TableSourceUtil.computeIndexMapping(
+      tableSource,
+      isStreamTable = false,
+      selectedFields)
+
     val config = tableEnv.getConfig
     val inputDataSet = 
tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]]
-    convertToInternalRow(inputDataSet, new TableSourceTable(tableSource), 
config)
+    val outputSchema = new RowSchema(this.getRowType)
+
+    // check that declared and actual type of table source DataSet are 
identical
+    if (inputDataSet.getType != tableSource.getReturnType) {
+      throw new TableException(s"TableSource of type 
${tableSource.getClass.getCanonicalName} " +
+        s"returned a DataSet of type ${inputDataSet.getType} that does not 
match with the " +
+        s"type ${tableSource.getReturnType} declared by the 
TableSource.getReturnType() method. " +
+        s"Please validate the implementation of the TableSource.")
+    }
+
+    // get expression to extract rowtime attribute
+    val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
+      tableSource,
+      selectedFields,
+      cluster,
+      tableEnv.getRelBuilder,
+      Types.SQL_TIMESTAMP
+    )
+
+    // ingest table and convert and extract time attributes if necessary
+    convertToInternalRow(
+      outputSchema,
+      inputDataSet,
+      fieldIndexes,
+      config,
+      rowtimeExpression)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
index b1cf106..07ba7e1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.flink.api.java.DataSet
 import org.apache.flink.table.api.BatchTableEnvironment
-import org.apache.flink.table.plan.schema.DataSetTable
+import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema}
 import org.apache.flink.types.Row
 
 /**
@@ -60,9 +60,11 @@ class DataSetScan(
   }
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] 
= {
-    val config = tableEnv.getConfig
+    val schema = new RowSchema(rowRelDataType)
     val inputDataSet: DataSet[Any] = dataSetTable.dataSet
-    convertToInternalRow(inputDataSet, dataSetTable, config)
+    val fieldIdxs = dataSetTable.fieldIndexes
+    val config = tableEnv.getConfig
+    convertToInternalRow(schema, inputDataSet, fieldIdxs, config, None)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
index 9352efb..59b3120 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala
@@ -60,7 +60,8 @@ class DataStreamScan(
 
     val config = tableEnv.getConfig
     val inputDataStream: DataStream[Any] = dataStreamTable.dataStream
-    convertToInternalRow(schema, inputDataStream, dataStreamTable, config)
+    val fieldIdxs = dataStreamTable.fieldIndexes
+    convertToInternalRow(schema, inputDataStream, fieldIdxs, config, None)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
index 4aca856..dd779de 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala
@@ -18,69 +18,111 @@
 
 package org.apache.flink.table.plan.nodes.datastream
 
+import org.apache.calcite.rex.RexNode
+import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.codegen.FunctionCodeGenerator
+import org.apache.flink.table.codegen.{FunctionCodeGenerator, 
GeneratedFunction}
 import org.apache.flink.table.plan.nodes.CommonScan
 import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.types.Row
-import org.apache.flink.table.plan.schema.FlinkTable
 import org.apache.flink.table.runtime.CRowOutputProcessRunner
 import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
-
-import scala.collection.JavaConverters._
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 trait StreamScan extends CommonScan[CRow] with DataStreamRel {
 
   protected def convertToInternalRow(
       schema: RowSchema,
       input: DataStream[Any],
-      flinkTable: FlinkTable[_],
-      config: TableConfig)
-    : DataStream[CRow] = {
+      fieldIdxs: Array[Int],
+      config: TableConfig,
+      rowtimeExpression: Option[RexNode]): DataStream[CRow] = {
 
     val inputType = input.getType
-    val internalType = CRowTypeInfo(schema.typeInfo)
+    val internalType = schema.typeInfo
+    val cRowType = CRowTypeInfo(internalType)
 
-    // conversion
-    if (needsConversion(input.getType, internalType)) {
+    val hasTimeIndicator = fieldIdxs.exists(f =>
+      f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER ||
+      f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER)
 
-      val generator = new FunctionCodeGenerator(
-        config,
-        false,
-        inputType,
-        None,
-        Some(flinkTable.fieldIndexes))
+    if (input.getType == cRowType && !hasTimeIndicator) {
+      // input is already a CRow with correct type
+      input.asInstanceOf[DataStream[CRow]]
 
-      val conversion = generator.generateConverterResultExpression(
-        schema.typeInfo,
-        schema.fieldNames)
+    } else if (input.getType == internalType && !hasTimeIndicator) {
+      // input is already of correct type. Only need to wrap it as CRow
+      input.asInstanceOf[DataStream[Row]].map(new RichMapFunction[Row, CRow] {
+        @transient private var outCRow: CRow = null
+        override def open(parameters: Configuration): Unit = {
+          outCRow = new CRow(null, change = true)
+        }
 
-      val body =
-        s"""
-           |${conversion.code}
-           |${generator.collectorTerm}.collect(${conversion.resultTerm});
-           |""".stripMargin
+        override def map(v: Row): CRow = {
+          outCRow.row = v
+          outCRow
+        }
+      }).returns(cRowType)
 
-      val function = generator.generateFunction(
+    } else {
+      // input needs to be converted and wrapped as CRow or time indicators 
need to be generated
+
+      val function = generateConversionProcessFunction(
+        config,
+        inputType,
+        internalType,
         "DataStreamSourceConversion",
-        classOf[ProcessFunction[Any, Row]],
-        body,
-        schema.typeInfo)
+        schema.fieldNames,
+        fieldIdxs,
+        rowtimeExpression
+      )
 
       val processFunc = new CRowOutputProcessRunner(
         function.name,
         function.code,
-        internalType)
+        cRowType)
 
-      val opName = s"from: 
(${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+      val opName = s"from: (${schema.fieldNames.mkString(", ")})"
 
-      input.process(processFunc).name(opName).returns(internalType)
-    }
-    // no conversion necessary, forward
-    else {
-      input.asInstanceOf[DataStream[CRow]]
+      input.process(processFunc).name(opName).returns(cRowType)
     }
   }
+
+  private def generateConversionProcessFunction(
+      config: TableConfig,
+      inputType: TypeInformation[Any],
+      outputType: TypeInformation[Row],
+      conversionOperatorName: String,
+      fieldNames: Seq[String],
+      inputFieldMapping: Array[Int],
+      rowtimeExpression: Option[RexNode]): 
GeneratedFunction[ProcessFunction[Any, Row], Row] = {
+
+    val generator = new FunctionCodeGenerator(
+      config,
+      false,
+      inputType,
+      None,
+      Some(inputFieldMapping))
+
+    val conversion = generator.generateConverterResultExpression(
+      outputType,
+      fieldNames,
+      rowtimeExpression)
+
+    val body =
+      s"""
+         |${conversion.code}
+         |${generator.collectorTerm}.collect(${conversion.resultTerm});
+         |""".stripMargin
+
+    generator.generateFunction(
+      "DataStreamSourceConversion",
+      classOf[ProcessFunction[Any, Row]],
+      body,
+      outputType)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 937f9c1..46acd6c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -22,28 +22,35 @@ import org.apache.calcite.plan._
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rex.RexNode
 import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment}
+import 
org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, 
AssignerWithPunctuatedWatermarks}
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, 
TableException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
-import org.apache.flink.table.plan.schema.{RowSchema, StreamTableSourceTable}
+import org.apache.flink.table.plan.schema.RowSchema
 import org.apache.flink.table.runtime.types.CRow
-import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.sources._
+import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
 /** Flink RelNode to read data from an external source defined by a 
[[StreamTableSource]]. */
 class StreamTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    tableSource: StreamTableSource[_])
-  extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource)
+    tableSource: StreamTableSource[_],
+    selectedFields: Option[Array[Int]])
+  extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource, 
selectedFields)
   with StreamScan {
 
   override def deriveRowType(): RelDataType = {
-
-    StreamTableSourceTable.deriveRowTypeOfTableSource(
-      tableSource.asInstanceOf[StreamTableSource[_]],
-      cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory])
+    val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    TableSourceUtil.getRelDataType(
+      tableSource,
+      selectedFields,
+      streaming = true,
+      flinkTypeFactory)
   }
 
   override def computeSelfCost (planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
@@ -56,20 +63,21 @@ class StreamTableSourceScan(
       cluster,
       traitSet,
       getTable,
-      tableSource
+      tableSource,
+      selectedFields
     )
   }
 
   override def copy(
       traitSet: RelTraitSet,
-      newTableSource: TableSource[_])
-    : PhysicalTableSourceScan = {
+      newTableSource: TableSource[_]): PhysicalTableSourceScan = {
 
     new StreamTableSourceScan(
       cluster,
       traitSet,
       getTable,
-      newTableSource.asInstanceOf[StreamTableSource[_]]
+      newTableSource.asInstanceOf[StreamTableSource[_]],
+      selectedFields
     )
   }
 
@@ -77,12 +85,101 @@ class StreamTableSourceScan(
       tableEnv: StreamTableEnvironment,
       queryConfig: StreamQueryConfig): DataStream[CRow] = {
 
+    val fieldIndexes = TableSourceUtil.computeIndexMapping(
+      tableSource,
+      isStreamTable = true,
+      selectedFields)
+
     val config = tableEnv.getConfig
     val inputDataStream = 
tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]]
-    convertToInternalRow(
-      new RowSchema(getRowType),
+    val outputSchema = new RowSchema(this.getRowType)
+
+    // check that declared and actual type of table source DataStream are 
identical
+    if (inputDataStream.getType != tableSource.getReturnType) {
+      throw new TableException(s"TableSource of type 
${tableSource.getClass.getCanonicalName} " +
+        s"returned a DataStream of type ${inputDataStream.getType} that does 
not match with the " +
+        s"type ${tableSource.getReturnType} declared by the 
TableSource.getReturnType() method. " +
+        s"Please validate the implementation of the TableSource.")
+    }
+
+    // get expression to extract rowtime attribute
+    val rowtimeExpression: Option[RexNode] = 
TableSourceUtil.getRowtimeExtractionExpression(
+      tableSource,
+      selectedFields,
+      cluster,
+      tableEnv.getRelBuilder,
+      TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+    )
+
+    // ingest table and convert and extract time attributes if necessary
+    val ingestedTable = convertToInternalRow(
+      outputSchema,
       inputDataStream,
-      new StreamTableSourceTable(tableSource),
-      config)
+      fieldIndexes,
+      config,
+      rowtimeExpression)
+
+    // generate watermarks for rowtime indicator
+    val rowtimeDesc: Option[RowtimeAttributeDescriptor] =
+      TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, 
selectedFields)
+
+    val withWatermarks = if (rowtimeDesc.isDefined) {
+      val rowtimeFieldIdx = 
outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName)
+      val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy
+      watermarkStrategy match {
+        case p: PeriodicWatermarkAssigner =>
+          val watermarkGenerator = new 
PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+          ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+        case p: PunctuatedWatermarkAssigner =>
+          val watermarkGenerator = new 
PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p)
+          ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator)
+      }
+    } else {
+      // No need to generate watermarks if no rowtime attribute is specified.
+      ingestedTable
+    }
+
+    withWatermarks
+  }
+}
+
+/**
+  * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]].
+  *
+  * @param timeFieldIdx the index of the rowtime attribute.
+  * @param assigner the watermark assigner.
+  */
+private class PeriodicWatermarkAssignerWrapper(
+    timeFieldIdx: Int,
+    assigner: PeriodicWatermarkAssigner)
+  extends AssignerWithPeriodicWatermarks[CRow] {
+
+  override def getCurrentWatermark: Watermark = assigner.getWatermark
+
+  override def extractTimestamp(crow: CRow, previousElementTimestamp: Long): 
Long = {
+    val timestamp: Long = crow.row.getField(timeFieldIdx).asInstanceOf[Long]
+    assigner.nextTimestamp(timestamp)
+    0L
+  }
+}
+
+/**
+  * Generates periodic watermarks based on a [[PunctuatedWatermarkAssigner]].
+  *
+  * @param timeFieldIdx the index of the rowtime attribute.
+  * @param assigner the watermark assigner.
+  */
+private class PunctuatedWatermarkAssignerWrapper(
+    timeFieldIdx: Int,
+    assigner: PunctuatedWatermarkAssigner)
+  extends AssignerWithPunctuatedWatermarks[CRow] {
+
+  override def checkAndGetNextWatermark(crow: CRow, ts: Long): Watermark = {
+    val timestamp: Long = crow.row.getField(timeFieldIdx).asInstanceOf[Long]
+    assigner.getWatermark(crow.row, timestamp)
+  }
+
+  override def extractTimestamp(element: CRow, previousElementTimestamp: 
Long): Long = {
+    0L
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index b009cf1..337025f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -25,11 +25,11 @@ import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rel.logical.LogicalTableScan
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter}
-import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.plan.nodes.FlinkConventions
-import org.apache.flink.table.plan.schema.{StreamTableSourceTable, 
TableSourceTable}
-import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, 
TableSource}
+import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
StreamTableSourceTable, TableSourceTable}
+import org.apache.flink.table.sources.{TableSource, TableSourceUtil}
 
 import scala.collection.JavaConverters._
 
@@ -37,27 +37,27 @@ class FlinkLogicalTableSourceScan(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     table: RelOptTable,
-    val tableSource: TableSource[_])
+    val tableSource: TableSource[_],
+    val selectedFields: Option[Array[Int]])
   extends TableScan(cluster, traitSet, table)
   with FlinkLogicalRel {
 
-  def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): 
FlinkLogicalTableSourceScan = {
-    new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource)
+  def copy(
+      traitSet: RelTraitSet,
+      tableSource: TableSource[_],
+      selectedFields: Option[Array[Int]]): FlinkLogicalTableSourceScan = {
+    new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource, 
selectedFields)
   }
 
   override def deriveRowType(): RelDataType = {
-
     val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-
-    tableSource match {
-      case s: StreamTableSource[_] =>
-        StreamTableSourceTable.deriveRowTypeOfTableSource(s, flinkTypeFactory)
-      case _: BatchTableSource[_] =>
-        val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
-        val fieldTypes = 
TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
-        flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes)
-      case _ => throw new TableException("Unknown TableSource type.")
+    val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match {
+      case _: StreamTableSourceTable[_] => true
+      case _: BatchTableSourceTable[_] => false
+      case t => throw TableException(s"Unknown Table type ${t.getClass}.")
     }
+
+    TableSourceUtil.getRelDataType(tableSource, selectedFields, 
streamingTable, flinkTypeFactory)
   }
 
   override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
@@ -67,7 +67,7 @@ class FlinkLogicalTableSourceScan(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     val terms = super.explainTerms(pw)
-        .item("fields", 
TableEnvironment.getFieldNames(tableSource).mkString(", "))
+        .item("fields", tableSource.getTableSchema.getColumnNames.mkString(", 
"))
 
     val sourceDesc = tableSource.explainSource()
     if (sourceDesc.nonEmpty) {
@@ -115,7 +115,8 @@ class FlinkLogicalTableSourceScanConverter
       rel.getCluster,
       traitSet,
       scan.getTable,
-      tableSource
+      tableSource,
+      None
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
index 649433e..32ca127 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala
@@ -59,7 +59,8 @@ class BatchTableSourceScanRule
       rel.getCluster,
       traitSet,
       scan.getTable,
-      scan.tableSource.asInstanceOf[BatchTableSource[_]]
+      scan.tableSource.asInstanceOf[BatchTableSource[_]],
+      scan.selectedFields
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
index 10cb1f7..fca1557 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala
@@ -61,7 +61,8 @@ class StreamTableSourceScanRule
       rel.getCluster,
       traitSet,
       scan.getTable,
-      scan.tableSource.asInstanceOf[StreamTableSource[_]]
+      scan.tableSource.asInstanceOf[StreamTableSource[_]],
+      scan.selectedFields
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
index ae6129e..e87fa35 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
@@ -99,7 +99,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
 
     // check whether we still need a RexProgram. An RexProgram is needed when 
either
     // projection or filter exists.
-    val newScan = scan.copy(scan.getTraitSet, newTableSource)
+    val newScan = scan.copy(scan.getTraitSet, newTableSource, 
scan.selectedFields)
     val newRexProgram = {
       if (remainingCondition != null || !program.projectsOnlyIdentity) {
         val expandedProjectList = program.getProjectList.asScala

http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
index 503badc..3ea97ab 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table.plan.rules.logical
 
 import org.apache.calcite.plan.RelOptRule.{none, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.plan.util.{RexProgramExtractor, 
RexProgramRewriter}
 import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalTableSourceScan}
-import org.apache.flink.table.sources.{DefinedProctimeAttribute, 
DefinedRowtimeAttribute, NestedFieldsProjectableTableSource, 
ProjectableTableSource}
+import org.apache.flink.table.sources._
 
 class PushProjectIntoTableSourceScanRule extends RelOptRule(
   operand(classOf[FlinkLogicalCalc],
@@ -31,39 +32,50 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule(
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: FlinkLogicalTableSourceScan = 
call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
-    scan.tableSource match {
-      // projection pushdown is not supported for sources that provide time 
indicators
-      case r: DefinedRowtimeAttribute if r.getRowtimeAttribute != null => false
-      case p: DefinedProctimeAttribute if p.getProctimeAttribute != null => 
false
-      case _: ProjectableTableSource[_] => true
-      case _ => false
-    }
+
+    // only continue if we haven't pushed down a projection yet.
+    scan.selectedFields.isEmpty
   }
 
   override def onMatch(call: RelOptRuleCall) {
     val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc]
     val scan: FlinkLogicalTableSourceScan = 
call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan]
-    val usedFields = RexProgramExtractor.extractRefInputFields(calc.getProgram)
-
-    // if no fields can be projected, we keep the original plan.
     val source = scan.tableSource
-    if (scan.getRowType.getFieldCount != usedFields.length) {
 
+    val accessedLogicalFields = 
RexProgramExtractor.extractRefInputFields(calc.getProgram)
+    val accessedPhysicalFields = TableSourceUtil.getPhysicalIndexes(source, 
accessedLogicalFields)
+
+    // only continue if fields are projected or reordered.
+    // eager reordering can remove a calc operator.
+    if (!(0 until 
scan.getRowType.getFieldCount).toArray.sameElements(accessedLogicalFields)) {
+
+      // try to push projection of physical fields to TableSource
       val newTableSource = source match {
         case nested: NestedFieldsProjectableTableSource[_] =>
           val nestedFields = RexProgramExtractor
-            .extractRefNestedInputFields(calc.getProgram, usedFields)
-          nested.projectNestedFields(usedFields, nestedFields)
+            .extractRefNestedInputFields(calc.getProgram, 
accessedPhysicalFields)
+          nested.projectNestedFields(accessedPhysicalFields, nestedFields)
         case projecting: ProjectableTableSource[_] =>
-          projecting.projectFields(usedFields)
+          projecting.projectFields(accessedPhysicalFields)
+        case nonProjecting: TableSource[_] =>
+          // projection cannot be pushed to TableSource
+          nonProjecting
+      }
+
+      // check that table schema of the new table source is identical to 
original
+      if (source.getTableSchema != newTableSource.getTableSchema) {
+        throw new TableException("TableSchema of ProjectableTableSource must 
not be modified " +
+          "by projectFields() call. This is a bug in the implementation of the 
TableSource " +
+          s"${source.getClass.getCanonicalName}.")
       }
 
-      val newScan = scan.copy(scan.getTraitSet, newTableSource)
+      // Apply the projection during the input conversion of the scan.
+      val newScan = scan.copy(scan.getTraitSet, newTableSource, 
Some(accessedLogicalFields))
       val newCalcProgram = RexProgramRewriter.rewriteWithFieldProjection(
         calc.getProgram,
         newScan.getRowType,
         calc.getCluster.getRexBuilder,
-        usedFields)
+        accessedLogicalFields)
 
       if (newCalcProgram.isTrivial) {
         // drop calc if the transformed program merely returns its input and 
doesn't exist filter

Reply via email to