[FLINK-7548] [table] Refactor TableSource interface. - Improves the time attribute handling of table sources.
This commit also resolves: - FLINK-6870: Unified handling of time attributes in batch and streaming - FLINK-7179: Support for projection push down and watermark assigners - FLINK-7696: Support for projection push down and time attributes This closes #4894. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9a2ba6e0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9a2ba6e0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9a2ba6e0 Branch: refs/heads/master Commit: 9a2ba6e058e907aef65e0af8731ca5ec8733712e Parents: 760f0bc Author: Fabian Hueske <[email protected]> Authored: Thu Oct 5 17:44:35 2017 +0200 Committer: Fabian Hueske <[email protected]> Committed: Mon Oct 30 14:09:27 2017 +0100 ---------------------------------------------------------------------- docs/dev/table/sourceSinks.md | 114 ++-- .../connectors/kafka/KafkaTableSource.java | 6 + .../flink/addons/hbase/HBaseTableSource.java | 50 +- .../flink/table/api/BatchTableEnvironment.scala | 4 +- .../table/api/StreamTableEnvironment.scala | 34 +- .../flink/table/api/TableEnvironment.scala | 25 +- .../apache/flink/table/api/TableSchema.scala | 48 +- .../table/catalog/ExternalTableSourceUtil.scala | 16 +- .../flink/table/codegen/CodeGenerator.scala | 56 +- .../flink/table/plan/nodes/CommonScan.scala | 50 +- .../plan/nodes/PhysicalTableSourceScan.scala | 18 +- .../table/plan/nodes/dataset/BatchScan.scala | 68 ++- .../nodes/dataset/BatchTableSourceScan.scala | 65 ++- .../table/plan/nodes/dataset/DataSetScan.scala | 8 +- .../plan/nodes/datastream/DataStreamScan.scala | 3 +- .../plan/nodes/datastream/StreamScan.scala | 112 ++-- .../datastream/StreamTableSourceScan.scala | 131 ++++- .../logical/FlinkLogicalTableSourceScan.scala | 37 +- .../dataSet/BatchTableSourceScanRule.scala | 3 +- .../datastream/StreamTableSourceScanRule.scala | 3 +- .../PushFilterIntoTableSourceScanRule.scala | 2 +- .../PushProjectIntoTableSourceScanRule.scala | 46 +- .../plan/schema/BatchTableSourceTable.scala | 43 ++ .../flink/table/plan/schema/DataSetTable.scala | 3 +- .../table/plan/schema/DataStreamTable.scala | 4 +- .../flink/table/plan/schema/FlinkTable.scala | 93 --- .../flink/table/plan/schema/InlineTable.scala | 115 ++++ .../plan/schema/StreamTableSourceTable.scala | 135 +---- .../table/plan/schema/TableSourceTable.scala | 30 +- .../flink/table/plan/stats/FlinkStatistic.scala | 4 +- .../flink/table/sources/CsvTableSource.scala | 96 +++- .../table/sources/DefinedFieldMapping.scala | 55 ++ .../flink/table/sources/DefinedFieldNames.scala | 35 -- .../flink/table/sources/FieldComputer.scala | 63 ++ .../NestedFieldsProjectableTableSource.scala | 40 +- .../table/sources/ProjectableTableSource.scala | 21 +- .../flink/table/sources/TableSource.scala | 40 +- .../flink/table/sources/TableSourceUtil.scala | 522 +++++++++++++++++ .../table/sources/definedTimeAttributes.scala | 77 +-- .../table/sources/timestampExtractors.scala | 77 +++ .../table/sources/watermarkStrategies.scala | 101 ++++ .../table/typeutils/TimeIndicatorTypeInfo.scala | 7 +- .../flink/table/api/ExternalCatalogTest.scala | 10 +- .../flink/table/api/TableSourceTest.scala | 36 +- .../api/stream/table/TableSourceTest.scala | 331 +++++++---- .../validation/TableSourceValidationTest.scala | 59 -- .../validation/FlinkTableValidationTest.scala | 39 -- .../validation/InlineTableValidationTest.scala | 39 ++ .../validation/TableSourceValidationTest.scala | 244 +++++--- .../catalog/ExternalTableSourceUtilTest.scala | 5 +- .../catalog/InMemoryExternalCatalogTest.scala | 11 +- .../runtime/batch/table/TableSourceITCase.scala | 571 ++++++++++++++++++- .../runtime/stream/TimeAttributesITCase.scala | 17 +- .../stream/table/TableSourceITCase.scala | 523 ++++++++++++++++- .../table/runtime/utils/CommonTestData.scala | 2 + .../table/utils/TestFilterableTableSource.scala | 135 ----- .../table/utils/TestTableSourceWithTime.scala | 52 -- .../flink/table/utils/testTableSources.scala | 304 ++++++++++ 58 files changed, 3637 insertions(+), 1201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/docs/dev/table/sourceSinks.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md index f77625b..d5bb874 100644 --- a/docs/dev/table/sourceSinks.md +++ b/docs/dev/table/sourceSinks.md @@ -298,17 +298,19 @@ Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify t Define a TableSource -------------------- -A `TableSource` is a generic interface to access to data stored in an external system as a table. It produces a `DataSet` or `DataStream` and provides the type information to derive the schema of the generated table. There are different table sources for batch tables and streaming tables. +A `TableSource` is a generic interface that gives Table API and SQL queries access to data stored in an external system. It provides the schema of the table and the records that are mapped to rows with the table's schema. Depending on whether the `TableSource` is used in a streaming or batch query, the records are produced as a `DataSet` or `DataStream`. -Schema information consists of a data type, field names, and corresponding indexes of these names in the data type. +If a `TableSource` is used in a streaming query it must implement the `StreamTableSource` interface, if it is used in a batch query it must implement the `BatchTableSource` interface. A `TableSource` can also implement both interfaces and be used in streaming and batch queries. -The general interface looks as follows: +`StreamTableSource` and `BatchTableSource` extend the base interface `TableSource` that defines the following methods: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} TableSource<T> { + public TableSchema getTableSchema(); + public TypeInformation<T> getReturnType(); public String explainSource(); @@ -320,6 +322,8 @@ TableSource<T> { {% highlight scala %} TableSource[T] { + def getTableSchema: TableSchema + def getReturnType: TypeInformation[T] def explainSource: String @@ -329,15 +333,19 @@ TableSource[T] { </div> </div> -To define a `TableSource` one needs to implement `TableSource#getReturnType`. In this case field names and field indexes are derived from the returned type. +* `getTableSchema()`: Returns the schema of the table, i.e., the names and types of the fields of the table. The field types are defined using Flink's `TypeInformation` (see [Table API types](tableApi.html#data-types) and [SQL types](sql.html#data-types)). -If the `TypeInformation` returned by `getReturnType` does not allow to specify custom field names, it is possible to implement the `DefinedFieldNames` interface in addition. +* `getReturnType()`: Returns the physical type of the `DataStream` (`StreamTableSource`) or `DataSet` (`BatchTableSource`) and the records that are produced by the `TableSource`. -### BatchTableSource +* `explainSource()`: Returns a String that describes the `TableSource`. This method is optional and used for display purposes only. -Defines an external `TableSource` to create a batch table and provides access to its data. +The `TableSource` interface separates the logical table schema from the physical type of the returned `DataStream` or `DataSet`. As a consequence, all fields of the table schema (`getTableSchema()`) must be mapped to a field with corresponding type of the physical return type (`getReturnType()`). By default, this mapping is done based on field names. For example, a `TableSource` that defines a table schema with two fields `[name: String, size: Integer]` requires a `TypeInformation` with at least two fields called `name` and `size` of type `String` and `Integer`, respectively. This could be a `PojoTypeInfo` or a `RowTypeInfo` that have two fields named `name` and `size` with matching types. -The interface looks as follows: +However, some types, such as Tuple or CaseClass types, do support custom field names. If a `TableSource` returns a `DataStream` or `DataSet` of a type with fixed field names, it can implement the `DefinedFieldMapping` interface to map field names from the table schema to field names of the physical return type. + +### Defining a BatchTableSource + +The `BatchTableSource` interface extends the `TableSource` interface and defines one additional method: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -359,20 +367,20 @@ BatchTableSource[T] extends TableSource[T] { </div> </div> -{% top %} +* `getDataSet(execEnv)`: Returns a `DataSet` with the data of the table. The type of the `DataSet` must be identical to the return type defined by the `TableSource.getReturnType()` method. The `DataSet` can by created using a regular [data source]({{ site.baseurl }}/dev/batch/#data-sources) of the DataSet API. Commonly, a `BatchTableSource` is implemented by wrapping a `InputFormat` or [batch connector]({{ site.baseurl }}/dev/batch/connectors.html). -### StreamTableSource +{% top %} -Defines an external `TableSource` to create a streaming table and provides access to its data. +### Defining a StreamTableSource -The interface looks as follows: +The `StreamTableSource` interface extends the `TableSource` interface and defines one additional method: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} StreamTableSource<T> extends TableSource<T> { - public DataSet<T> getDataStream(StreamExecutionEnvironment execEnv); + public DataStream<T> getDataStream(StreamExecutionEnvironment execEnv); } {% endhighlight %} </div> @@ -381,84 +389,85 @@ StreamTableSource<T> extends TableSource<T> { {% highlight scala %} StreamTableSource[T] extends TableSource[T] { - def getDataStream(execEnv: StreamExecutionEnvironment): DataSet[T] + def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] } {% endhighlight %} </div> </div> -**IMPORTANT:** Time-based operations on streaming tables such as windows require explicitly specified [time attributes]({{ site.baseurl }}/dev/table/streaming.html#time-attributes) (both for the [Table API](tableApi.html#group-windows) and [SQL](sql.html#group-windows)). A `StreamTableSource` defines +* `getDataStream(execEnv)`: Returns a `DataStream` with the data of the table. The type of the `DataStream` must be identical to the return type defined by the `TableSource.getReturnType()` method. The `DataStream` can by created using a regular [data source]({{ site.baseurl }}/dev/datastream_api.html#data-sources) of the DataStream API. Commonly, a `StreamTableSource` is implemented by wrapping a `SourceFunction` or a [stream connector]({{ site.baseurl }}/dev/connectors/). + +### Defining a TableSource with Time Attributes -- an *event-time attribute* by implementing the `DefinedRowtimeAttribute` interface and -- a *processing-time attribute* by implementing the `DefinedProctimeAttribute` interface. +Time-based operations of streaming [Table API](tableApi.html#group-windows) and [SQL](sql.html#group-windows) queries, such as windowed aggregations or joins, require explicitly specified [time attributes]({{ site.baseurl }}/dev/table/streaming.html#time-attributes). -Both are described in the following sections. +A `TableSource` defines a time attribute as a field of type `Types.SQL_TIMESTAMP` in its table schema. In contrast to all regular fields in the schema, a time attribute must not be matched to a physical field in the return type of the table source. Instead, a `TableSource` defines a time attribute by implementing a certain interface. -#### DefinedRowtimeAttribute +#### Defining a Processing Time Attribute -The `DefinedRowtimeAttribute` interface provides a single method. +A `TableSource` defines a [processing time attribute](streaming.html#processing-time) by implementing the `DefinedProctimeAttribute` interface. The interface looks as follows: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DefinedRowtimeAttribute { +DefinedProctimeAttribute { - public String getRowtimeAttribute(); + public String getProctimeAttribute(); } {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -DefinedRowtimeAttribute { +DefinedProctimeAttribute { - def getRowtimeAttribute(): String + def getProctimeAttribute: String } {% endhighlight %} </div> </div> -The `getRowtimeAttribute()` method returns the name of the field that holds the event-time timestamps for the rows of the table. The field must exist in the schema of the `StreamTableSource` and be of type `LONG` or `TIMESTAMP`. Moreover, the `DataStream` returned by `StreamTableSource.getDataStream()` must have [watermarks]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) assigned which are aligned with the values of the specified timestamp field. - -Please see the documentation on [timestamp and watermark assignment]({{ site.baseurl }}/dev/event_timestamps_watermarks.html) for details on how to assign watermarks. Please note that the timestamps of a `DataStream` (the ones which are assigned by a `TimestampAssigner`) are ignored. Only the values of the `TableSource`'s rowtime field are relevant. +* `getProctimeAttribute()`: Returns the name of the processing time attribute. The specified attribute must be defined of type `Types.SQL_TIMESTAMP` in the table schema and can be used in time-based operations. A `DefinedProctimeAttribute` table source can define no processing time attribute by returning `null`. -**Note:** A `TableSource` that returns a rowtime attribute does not support projection pushdown. +**Note** Both `StreamTableSource` and `BatchTableSource` can implement `DefinedProctimeAttribute` and define a processing time attribute. In case of a `BatchTableSource` the processing time field is initialized with the current timestamp during the table scan. -#### DefinedProctimeAttribute +#### Defining a Rowtime Attribute -The `DefinedProctimeAttribute` interface provides a single method. +A `TableSource` defines a [rowtime attribute](streaming.html#event-time) by implementing the `DefinedRowtimeAttributes` interface. The interface looks as follows: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DefinedProctimeAttribute { +DefinedRowtimeAttribute { - public String getProctimeAttribute(); + public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors(); } {% endhighlight %} </div> <div data-lang="scala" markdown="1"> {% highlight scala %} -DefinedProctimeAttribute { +DefinedRowtimeAttributes { - def getProctimeAttribute(): String + def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] } {% endhighlight %} </div> </div> -The `getProctimeAttribute()` method returns the name of a field that is appended to each row returned by the `StreamTableSource`. The appended field serves as a processing time timestamp and can be used in time-based operations. +* `getRowtimeAttributeDescriptors()`: Returns a list of `RowtimeAttributeDescriptor`. A `RowtimeAttributeDescriptor` describes a rowtime attribute with the following properties: + * `attributeName`: The name of the rowtime attribute in the table schema. The field must be defined with type `Types.SQL_TIMESTAMP`. + * `timestampExtractor`: The timestamp extractor extracts the timestamp from a record with the return type. For example, it can convert convert a Long field into a timestamp or parse a String-encoded timestamp. Flink comes with a set of built-in `TimestampExtractor` implementation for common use cases. It is also possible to provide a custom implementation. + * `watermarkStrategy`: The watermark strategy defines how watermarks are generated for the rowtime attribute. Flink comes with a set of built-in `WatermarkStrategy` implementations for common use cases. It is also possible to provide a custom implementation. +* **Note** Although the `getRowtimeAttributeDescriptors()` method returns a list of descriptors, only a single rowtime attribute is support at the moment. We plan to remove this restriction in the future and support tables with more than one rowtime attribute. -**Note:** A `TableSource` that returns a processing time attribute does not support projection pushdown. +**IMPORTANT** Both, `StreamTableSource` and `BatchTableSource`, can implement `DefinedRowtimeAttributes` and define a rowtime attribute. In either case, the rowtime field is extracted using the `TimestampExtractor`. Hence, a `TableSource` that implements `StreamTableSource` and `BatchTableSource` and defines a rowtime attribute provides exactly the same data to streaming and batch queries. {% top %} -### ProjectableTableSource +### Defining a TableSource with Projection Push-Down -The `ProjectableTableSource` interface adds support for projection push-down to a `TableSource`. A `TableSource` extending this interface is able to project the fields of the return table. - -The interface looks as follows: +A `TableSource` supports projection push-down by implementing the `ProjectableTableSource` interface. The interface defines a single method: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -474,21 +483,15 @@ ProjectableTableSource<T> { {% highlight scala %} ProjectableTableSource[T] { - def TableSource[T] projectFields(fields: Array[Int]) + def projectFields(fields: Array[Int]): TableSource[T] } {% endhighlight %} </div> </div> -The `projectFields()` is called with an array that holds the indexes of the required fields. The method returns a new `TableSource` object that returns rows with the requested schema. - -{% top %} +* `projectFields(fields)`: Returns a *copy* of the `TableSource` with adjusted physical return type. The `fields` parameter provides the indexes of the fields that must be provided by the `TableSource`. The indexes relate to the `TypeInformation` of the physical return type, *not* to the logical table schema. The copied `TableSource` must adjust its return type and the returned `DataStream` or `DataSet`. The `TableSchema` of the copied `TableSource` must not be changed, i.e, it must be the same as the original `TableSource`. If the `TableSource` implements the `DefinedFieldMapping` interface, the field mapping must be adjusted to the new return type. -### NestedFieldsProjectableTableSource - -The `NestedFieldsProjectableTableSource` interface adds support for projection push-down to a `TableSource` with nested fields. A `TableSource` extending this interface is able to project the nested fields of the returned table. - -The interface looks as follows: +The `ProjectableTableSource` adds support to project flat fields. If the `TableSource` defines a table with nested schema, it can implement the `NestedFieldsProjectableTableSource` to extend the projection to nested fields. The `NestedFieldsProjectableTableSource` is defined as follows: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -510,9 +513,13 @@ NestedFieldsProjectableTableSource[T] { </div> </div> -### FilterableTableSource +* `projectNestedField(fields, nestedFields)`: Returns a *copy* of the `TableSource` with adjusted physical return type. Fields of the physical return type may be removed or reordered but their type must not be changed. The contract of this method is essentially the same as for the `ProjectableTableSource.projectFields()` method. In addition, the `nestedFields` parameter contains for each field index in the `fields` list, a list of paths to all nested fields that are accessed by the query. All other nested fields do not need to be read, parsed, and set in the records that are produced by the `TableSource`. **IMPORTANT** the types of the projected fields must not be changed but unused fields may be set to null or to a default value. -The `FilterableTableSource` interface adds support for filtering push-down to a `TableSource`. A `TableSource` extending this interface is able to filter records before returning. +{% top %} + +### Defining a TableSource with Filter Push-Down + +The `FilterableTableSource` interface adds support for filter push-down to a `TableSource`. A `TableSource` extending this interface is able to filter records such that the returned `DataStream` or `DataSet` returns fewer records. The interface looks as follows: @@ -540,9 +547,8 @@ FilterableTableSource[T] { </div> </div> -The optimizer pushes predicates down by calling the `applyPredicate()` method. The `TableSource` can evaluate which predicates to evaluate by itself and which to leave for the framework. Predicates which are evaluated by the `TableSource` must be removed from the `List`. All predicates which remain in the `List` after the method call returns are evaluated by the framework. The `applyPredicate()` method returns a new `TableSource` that evaluates all selected predicates. - -The `isFilterPushedDown()` method tells the optimizer whether predicates have been pushed down or not. +* `applyPredicate(predicates)`: Returns a *copy* of the `TableSource` with added predicates. The `predicates` parameter is a mutable list of conjunctive predicates that are "offered" to the `TableSource`. The `TableSource` accepts to evaluate a predicate by removing it from the list. Predicates that are left in the list will be evaluated by a subsequent filter operator. +* `isFilterPushedDown()`: Returns true if the `applyPredicate()` method was called before. Hence, `isFilterPushedDown()` must return true for all `TableSource` instances returned from a `applyPredicate()` call. {% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index 8969f90..a9cf235 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; @@ -84,6 +85,11 @@ public abstract class KafkaTableSource implements StreamTableSource<Row> { return typeInfo; } + @Override + public TableSchema getTableSchema() { + return TableSchema.fromTypeInfo(typeInfo); + } + /** * Returns the version-specific Kafka consumer. * http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java index cc7e602..eb0fc9b 100644 --- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java +++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.BatchTableSource; import org.apache.flink.table.sources.ProjectableTableSource; import org.apache.flink.types.Row; @@ -58,7 +59,8 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable private Configuration conf; private String tableName; - private HBaseTableSchema schema; + private HBaseTableSchema hBaseSchema; + private TableSchema tableSchema; /** * The HBase configuration and the name of the table to read. @@ -69,7 +71,14 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable public HBaseTableSource(Configuration conf, String tableName) { this.conf = conf; this.tableName = Preconditions.checkNotNull(tableName, "Table name"); - this.schema = new HBaseTableSchema(); + this.hBaseSchema = new HBaseTableSchema(); + } + + private HBaseTableSource(Configuration conf, String tableName, TableSchema tableSchema) { + this.conf = conf; + this.tableName = Preconditions.checkNotNull(tableName, "Table name"); + this.hBaseSchema = new HBaseTableSchema(); + this.tableSchema = tableSchema; } /** @@ -80,7 +89,7 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable * @param clazz the data type of the qualifier */ public void addColumn(String family, String qualifier, Class<?> clazz) { - this.schema.addColumn(family, qualifier, clazz); + this.hBaseSchema.addColumn(family, qualifier, clazz); } /** @@ -89,34 +98,51 @@ public class HBaseTableSource implements BatchTableSource<Row>, ProjectableTable * @param charset Name of the charset to use. */ public void setCharset(String charset) { - this.schema.setCharset(charset); + this.hBaseSchema.setCharset(charset); } @Override public TypeInformation<Row> getReturnType() { - String[] famNames = schema.getFamilyNames(); - TypeInformation<?>[] typeInfos = new TypeInformation[famNames.length]; + return new RowTypeInfo(getFieldTypes(), getFieldNames()); + } + + @Override + public TableSchema getTableSchema() { + if (this.tableSchema == null) { + return new TableSchema(getFieldNames(), getFieldTypes()); + } else { + return this.tableSchema; + } + } + + private String[] getFieldNames() { + return hBaseSchema.getFamilyNames(); + } + + private TypeInformation[] getFieldTypes() { + String[] famNames = hBaseSchema.getFamilyNames(); + TypeInformation<?>[] fieldTypes = new TypeInformation[hBaseSchema.getFamilyNames().length]; int i = 0; for (String family : famNames) { - typeInfos[i] = new RowTypeInfo(schema.getQualifierTypes(family), schema.getQualifierNames(family)); + fieldTypes[i] = new RowTypeInfo(hBaseSchema.getQualifierTypes(family), hBaseSchema.getQualifierNames(family)); i++; } - return new RowTypeInfo(typeInfos, famNames); + return fieldTypes; } @Override public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) { - return execEnv.createInput(new HBaseRowInputFormat(conf, tableName, schema), getReturnType()); + return execEnv.createInput(new HBaseRowInputFormat(conf, tableName, hBaseSchema), getReturnType()); } @Override public HBaseTableSource projectFields(int[] fields) { - String[] famNames = schema.getFamilyNames(); - HBaseTableSource newTableSource = new HBaseTableSource(this.conf, tableName); + String[] famNames = hBaseSchema.getFamilyNames(); + HBaseTableSource newTableSource = new HBaseTableSource(this.conf, tableName, getTableSchema().copy()); // Extract the family from the given fields for (int field : fields) { String family = famNames[field]; - Map<String, TypeInformation<?>> familyInfo = schema.getFamilyInfo(family); + Map<String, TypeInformation<?>> familyInfo = hBaseSchema.getFamilyInfo(family); for (String qualifier : familyInfo.keySet()) { // create the newSchema newTableSource.addColumn(family, qualifier, familyInfo.get(qualifier).getTypeClass()); http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index bca5826..8a78ac4 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -36,7 +36,7 @@ import org.apache.flink.table.expressions.{Expression, TimeAttribute} import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.dataset.DataSetRel import org.apache.flink.table.plan.rules.FlinkRuleSets -import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema, TableSinkTable, TableSourceTable} +import org.apache.flink.table.plan.schema.{BatchTableSourceTable, DataSetTable, RowSchema, TableSinkTable} import org.apache.flink.table.runtime.MapRunner import org.apache.flink.table.sinks.{BatchTableSink, TableSink} import org.apache.flink.table.sources.{BatchTableSource, TableSource} @@ -98,7 +98,7 @@ abstract class BatchTableEnvironment( tableSource match { case batchTableSource: BatchTableSource[_] => - registerTableInternal(name, new TableSourceTable(batchTableSource)) + registerTableInternal(name, new BatchTableSourceTable(batchTableSource)) case _ => throw new TableException("Only BatchTableSource can be registered in " + "BatchTableEnvironment") http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index e42beae..e80acca 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -48,7 +48,7 @@ import org.apache.flink.table.plan.schema.{DataStreamTable, RowSchema, StreamTab import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.runtime.{CRowMapRunner, OutputRowtimeProcessFunction} import org.apache.flink.table.sinks._ -import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource} +import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceUtil} import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils} import _root_.scala.collection.JavaConverters._ @@ -109,23 +109,19 @@ abstract class StreamTableEnvironment( override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = { checkValidTableName(name) - // check if event-time is enabled - tableSource match { - case dra: DefinedRowtimeAttribute if dra.getRowtimeAttribute != null && - execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime => - - throw TableException( - s"A rowtime attribute requires an EventTime time characteristic in stream environment. " + - s"But is: ${execEnv.getStreamTimeCharacteristic}") - case _ => // ok - } - tableSource match { case streamTableSource: StreamTableSource[_] => + // check that event-time is enabled if table source includes rowtime attributes + if (TableSourceUtil.hasRowtimeAttribute(streamTableSource) && + execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) { + throw TableException( + s"A rowtime attribute requires an EventTime time characteristic in stream " + + s"environment. But is: ${execEnv.getStreamTimeCharacteristic}") + } registerTableInternal(name, new StreamTableSourceTable(streamTableSource)) case _ => throw new TableException("Only StreamTableSource can be registered in " + - "StreamTableEnvironment") + "StreamTableEnvironment") } } @@ -554,14 +550,18 @@ abstract class StreamTableEnvironment( // inject rowtime field val withRowtime = rowtime match { - case Some(rt) => fieldIndexes.patch(rt._1, Seq(TimeIndicatorTypeInfo.ROWTIME_MARKER), 0) - case _ => fieldIndexes + case Some(rt) => + fieldIndexes.patch(rt._1, Seq(TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER), 0) + case _ => + fieldIndexes } // inject proctime field val withProctime = proctime match { - case Some(pt) => withRowtime.patch(pt._1, Seq(TimeIndicatorTypeInfo.PROCTIME_MARKER), 0) - case _ => withRowtime + case Some(pt) => + withRowtime.patch(pt._1, Seq(TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER), 0) + case _ => + withRowtime } withProctime http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 7e360f9..fec7f1a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -57,7 +57,7 @@ import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSinkTable} import org.apache.flink.table.sinks.TableSink -import org.apache.flink.table.sources.{DefinedFieldNames, TableSource} +import org.apache.flink.table.sources.TableSource import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.table.validate.FunctionCatalog import org.apache.flink.types.Row @@ -1171,27 +1171,4 @@ object TableEnvironment { } } - /** - * Returns field names for a given [[TableSource]]. - * - * @param tableSource The TableSource to extract field names from. - * @tparam A The type of the TableSource. - * @return An array holding the field names. - */ - def getFieldNames[A](tableSource: TableSource[A]): Array[String] = tableSource match { - case d: DefinedFieldNames => d.getFieldNames - case _ => TableEnvironment.getFieldNames(tableSource.getReturnType) - } - - /** - * Returns field indices for a given [[TableSource]]. - * - * @param tableSource The TableSource to extract field indices from. - * @tparam A The type of the TableSource. - * @return An array holding the field indices. - */ - def getFieldIndices[A](tableSource: TableSource[A]): Array[Int] = tableSource match { - case d: DefinedFieldNames => d.getFieldIndices - case _ => TableEnvironment.getFieldIndices(tableSource.getReturnType) - } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala index 6ee65f0..f7ef710 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.api import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType /** * A TableSchema represents a Table's structure. @@ -51,6 +52,13 @@ class TableSchema( val columnNameToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap /** + * Returns a deep copy of the TableSchema. + */ + def copy: TableSchema = { + new TableSchema(columnNames.clone(), columnTypes.clone()) + } + + /** * Returns all type information as an array. */ def getTypes: Array[TypeInformation[_]] = columnTypes @@ -99,7 +107,7 @@ class TableSchema( } } - override def toString = { + override def toString: String = { val builder = new StringBuilder builder.append("root\n") columnNames.zip(columnTypes).foreach{ case (name, typeInfo) => @@ -108,4 +116,42 @@ class TableSchema( builder.toString() } + override def equals(other: Any): Boolean = { + other match { + case that: TableSchema if that.canEqual(this) => + this.columnNames.deep == that.columnNames.deep && + this.columnTypes.deep == that.columnTypes.deep + case _ => false + } + } + + def canEqual(other: Any): Boolean = other.isInstanceOf[TableSchema] + +} + +object TableSchema { + + /** + * Creates a [[TableSchema]] from a [[TypeInformation]]. + * If the [[TypeInformation]] is a [[CompositeType]], the fieldnames and types for the composite + * type are used to construct the [[TableSchema]]. + * Otherwise, a [[TableSchema]] with a single field is created. The field name is "f0" and the + * field type the provided type. + * + * @param typeInfo The [[TypeInformation]] from which the [[TableSchema]] is generated. + * @return The [[TableSchema]] that was generated from the given [[TypeInformation]]. + */ + def fromTypeInfo(typeInfo: TypeInformation[_]): TableSchema = { + typeInfo match { + case c: CompositeType[_] => + // get field names and types from composite type + val fieldNames = c.getFieldNames + val fieldTypes = fieldNames.map(c.getTypeAt).asInstanceOf[Array[TypeInformation[_]]] + new TableSchema(fieldNames, fieldTypes) + case t: TypeInformation[_] => + // create table schema with a single field named "f0" of the given type. + new TableSchema(Array("f0"), Array(t)) + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala index 6bacac1..d2e297f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala @@ -23,10 +23,10 @@ import java.net.URL import org.apache.commons.configuration.{ConfigurationException, ConversionException, PropertiesConfiguration} import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.table.annotation.TableType -import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException} -import org.apache.flink.table.plan.schema.{StreamTableSourceTable, TableSourceTable} +import org.apache.flink.table.api.{AmbiguousTableSourceConverterException, NoMatchedTableSourceConverterException, TableException} +import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable} import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.{StreamTableSource, TableSource} +import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource} import org.apache.flink.table.util.Logging import org.apache.flink.util.InstantiationUtil import org.reflections.Reflections @@ -124,9 +124,15 @@ object ExternalTableSourceUtil extends Logging { } else { FlinkStatistic.UNKNOWN } + convertedTableSource match { - case s : StreamTableSource[_] => new StreamTableSourceTable(s, flinkStatistic) - case _ => new TableSourceTable(convertedTableSource, flinkStatistic) + case s: StreamTableSource[_] => + new StreamTableSourceTable(s, flinkStatistic) + case b: BatchTableSource[_] => + new BatchTableSourceTable(b, flinkStatistic) + case _ => + throw new TableException("Unknown TableSource type.") + } } case None => http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 5e7ec32..be7f84f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -37,11 +37,10 @@ import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.CodeGenUtils._ import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE} -import org.apache.flink.table.codegen.calls.FunctionGenerator +import org.apache.flink.table.codegen.calls.{CurrentTimePointCallGen, FunctionGenerator} import org.apache.flink.table.codegen.calls.ScalarOperators._ import org.apache.flink.table.functions.sql.{ProctimeSqlFunction, ScalarSqlFunctions} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils - import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction} import org.apache.flink.table.typeutils.TypeCheckUtils._ @@ -242,44 +241,39 @@ abstract class CodeGenerator( * * @param returnType conversion target type. Inputs and output must have the same arity. * @param resultFieldNames result field names necessary for a mapping to POJO fields. + * @param rowtimeExpression an optional expression to extract the value of a rowtime field from + * the input data. If not set, the value of rowtime field is set to the + * StreamRecord timestamp. * @return instance of GeneratedExpression */ def generateConverterResultExpression( returnType: TypeInformation[_ <: Any], - resultFieldNames: Seq[String]) + resultFieldNames: Seq[String], + rowtimeExpression: Option[RexNode] = None) : GeneratedExpression = { val input1AccessExprs = input1Mapping.map { - case TimeIndicatorTypeInfo.ROWTIME_MARKER => - // attribute is a rowtime indicator. Access event-time timestamp in StreamRecord. - generateRowtimeAccess() - case TimeIndicatorTypeInfo.PROCTIME_MARKER => + case TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER | + TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER => + // attribute is a rowtime indicator. + rowtimeExpression match { + case Some(expr) => + // generate rowtime attribute from expression + generateExpression(expr) + case _ => + // fetch rowtime attribute from StreamRecord timestamp field + generateRowtimeAccess() + } + case TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER => // attribute is proctime indicator. // we use a null literal and generate a timestamp when we need it. generateNullLiteral(TimeIndicatorTypeInfo.PROCTIME_INDICATOR) + case TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER => + // attribute is proctime field in a batch query. + // it is initialized with the current time. + generateCurrentTimestamp() case idx => - // get type of result field - val outIdx = input1Mapping.indexOf(idx) - val outType = returnType match { - case pt: PojoTypeInfo[_] => pt.getTypeAt(resultFieldNames(outIdx)) - case ct: CompositeType[_] => ct.getTypeAt(outIdx) - case t: TypeInformation[_] => t - } - val inputAccess = generateInputAccess(input1, input1Term, idx) - // Change output type to rowtime indicator - if (FlinkTypeFactory.isRowtimeIndicatorType(outType) && - (inputAccess.resultType == Types.LONG || inputAccess.resultType == Types.SQL_TIMESTAMP)) { - // This case is required for TableSources that implement DefinedRowtimeAttribute. - // Hard cast possible because LONG, TIMESTAMP, and ROWTIME_INDICATOR are internally - // represented as Long. - GeneratedExpression( - inputAccess.resultTerm, - inputAccess.nullTerm, - inputAccess.code, - TimeIndicatorTypeInfo.ROWTIME_INDICATOR) - } else { - inputAccess - } + generateInputAccess(input1, input1Term, idx) } val input2AccessExprs = input2 match { @@ -1332,6 +1326,10 @@ abstract class CodeGenerator( GeneratedExpression(resultTerm, NEVER_NULL, resultCode, SqlTimeTypeInfo.TIMESTAMP) } + private[flink] def generateCurrentTimestamp(): GeneratedExpression = { + new CurrentTimePointCallGen(Types.SQL_TIMESTAMP, false).generate(this, Seq()) + } + // ---------------------------------------------------------------------------------------------- // Reusable code snippets // ---------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala index 996d62c..252b748 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala @@ -18,55 +18,7 @@ package org.apache.flink.table.plan.nodes -import org.apache.flink.api.common.functions.Function -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} -import org.apache.flink.types.Row - /** * Common class for batch and stream scans. */ -trait CommonScan[T] { - - /** - * We check if the input type is exactly the same as the internal row type. - * A conversion is necessary if types differ. - */ - private[flink] def needsConversion( - externalTypeInfo: TypeInformation[Any], - internalTypeInfo: TypeInformation[T]): Boolean = - externalTypeInfo != internalTypeInfo - - private[flink] def generatedConversionFunction[F <: Function]( - config: TableConfig, - functionClass: Class[F], - inputType: TypeInformation[Any], - expectedType: TypeInformation[Row], - conversionOperatorName: String, - fieldNames: Seq[String], - inputFieldMapping: Option[Array[Int]] = None) - : GeneratedFunction[F, Row] = { - - val generator = new FunctionCodeGenerator( - config, - false, - inputType, - None, - inputFieldMapping) - val conversion = generator.generateConverterResultExpression(expectedType, fieldNames) - - val body = - s""" - |${conversion.code} - |return ${conversion.resultTerm}; - |""".stripMargin - - generator.generateFunction( - conversionOperatorName, - functionClass, - body, - expectedType) - } - -} +trait CommonScan[T] http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala index 5872d8c..c9b4243 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala @@ -22,9 +22,10 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptTable, RelTraitSet} import org.apache.calcite.rel.RelWriter import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.sources.TableSource +import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable} +import org.apache.flink.table.sources.{TableSource, TableSourceUtil} import scala.collection.JavaConverters._ @@ -32,14 +33,19 @@ abstract class PhysicalTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - val tableSource: TableSource[_]) + val tableSource: TableSource[_], + val selectedFields: Option[Array[Int]]) extends TableScan(cluster, traitSet, table) { override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildLogicalRowType( - TableEnvironment.getFieldNames(tableSource), - TableEnvironment.getFieldTypes(tableSource.getReturnType)) + val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match { + case _: StreamTableSourceTable[_] => true + case _: BatchTableSourceTable[_] => false + case t => throw TableException(s"Unknown Table type ${t.getClass}.") + } + + TableSourceUtil.getRelDataType(tableSource, selectedFields, streamingTable, flinkTypeFactory) } override def explainTerms(pw: RelWriter): RelWriter = { http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala index 95707b8..85152d8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala @@ -18,48 +18,52 @@ package org.apache.flink.table.plan.nodes.dataset +import org.apache.calcite.rex.RexNode import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonScan -import org.apache.flink.table.plan.schema.FlinkTable +import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.MapRunner +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import org.apache.flink.types.Row -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ - trait BatchScan extends CommonScan[Row] with DataSetRel { protected def convertToInternalRow( + schema: RowSchema, input: DataSet[Any], - flinkTable: FlinkTable[_], - config: TableConfig) - : DataSet[Row] = { + fieldIdxs: Array[Int], + config: TableConfig, + rowtimeExpression: Option[RexNode]): DataSet[Row] = { val inputType = input.getType + val internalType = schema.typeInfo - val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) + val hasTimeIndicator = fieldIdxs.exists(f => + f == TimeIndicatorTypeInfo.ROWTIME_BATCH_MARKER || + f == TimeIndicatorTypeInfo.PROCTIME_BATCH_MARKER) // conversion - if (needsConversion(inputType, internalType)) { + if (inputType != internalType || hasTimeIndicator) { - val function = generatedConversionFunction( + val function = generateConversionMapper( config, - classOf[MapFunction[Any, Row]], inputType, internalType, "DataSetSourceConversion", - getRowType.getFieldNames, - Some(flinkTable.fieldIndexes)) + schema.fieldNames, + fieldIdxs, + rowtimeExpression) val runner = new MapRunner[Any, Row]( function.name, function.code, function.returnType) - val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" + val opName = s"from: (${schema.fieldNames.mkString(", ")})" input.map(runner).name(opName) } @@ -68,4 +72,38 @@ trait BatchScan extends CommonScan[Row] with DataSetRel { input.asInstanceOf[DataSet[Row]] } } + + private def generateConversionMapper( + config: TableConfig, + inputType: TypeInformation[Any], + outputType: TypeInformation[Row], + conversionOperatorName: String, + fieldNames: Seq[String], + inputFieldMapping: Array[Int], + rowtimeExpression: Option[RexNode]): GeneratedFunction[MapFunction[Any, Row], Row] = { + + val generator = new FunctionCodeGenerator( + config, + false, + inputType, + None, + Some(inputFieldMapping)) + + val conversion = generator.generateConverterResultExpression( + outputType, + fieldNames, + rowtimeExpression) + + val body = + s""" + |${conversion.code} + |return ${conversion.resultTerm}; + |""".stripMargin + + generator.generateFunction( + "DataSetSourceConversion", + classOf[MapFunction[Any, Row]], + body, + outputType) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala index 74aac43..d09d69c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -20,13 +20,15 @@ package org.apache.flink.table.plan.nodes.dataset import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.RexNode import org.apache.flink.api.java.DataSet -import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment} +import org.apache.flink.table.api.{BatchTableEnvironment, TableException, Types} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan -import org.apache.flink.table.plan.schema.TableSourceTable -import org.apache.flink.table.sources.{BatchTableSource, TableSource} +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.sources._ import org.apache.flink.types.Row /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */ @@ -34,15 +36,18 @@ class BatchTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - tableSource: BatchTableSource[_]) - extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource) + tableSource: BatchTableSource[_], + selectedFields: Option[Array[Int]]) + extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource, selectedFields) with BatchScan { - override def deriveRowType() = { + override def deriveRowType(): RelDataType = { val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - flinkTypeFactory.buildLogicalRowType( - TableEnvironment.getFieldNames(tableSource), - TableEnvironment.getFieldTypes(tableSource.getReturnType)) + TableSourceUtil.getRelDataType( + tableSource, + selectedFields, + streaming = false, + flinkTypeFactory) } override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { @@ -55,26 +60,58 @@ class BatchTableSourceScan( cluster, traitSet, getTable, - tableSource + tableSource, + selectedFields ) } override def copy( traitSet: RelTraitSet, - newTableSource: TableSource[_]) - : PhysicalTableSourceScan = { + newTableSource: TableSource[_]): PhysicalTableSourceScan = { new BatchTableSourceScan( cluster, traitSet, getTable, - newTableSource.asInstanceOf[BatchTableSource[_]] + newTableSource.asInstanceOf[BatchTableSource[_]], + selectedFields ) } override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + + val fieldIndexes = TableSourceUtil.computeIndexMapping( + tableSource, + isStreamTable = false, + selectedFields) + val config = tableEnv.getConfig val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] - convertToInternalRow(inputDataSet, new TableSourceTable(tableSource), config) + val outputSchema = new RowSchema(this.getRowType) + + // check that declared and actual type of table source DataSet are identical + if (inputDataSet.getType != tableSource.getReturnType) { + throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " + + s"returned a DataSet of type ${inputDataSet.getType} that does not match with the " + + s"type ${tableSource.getReturnType} declared by the TableSource.getReturnType() method. " + + s"Please validate the implementation of the TableSource.") + } + + // get expression to extract rowtime attribute + val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + tableSource, + selectedFields, + cluster, + tableEnv.getRelBuilder, + Types.SQL_TIMESTAMP + ) + + // ingest table and convert and extract time attributes if necessary + convertToInternalRow( + outputSchema, + inputDataSet, + fieldIndexes, + config, + rowtimeExpression) } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala index b1cf106..07ba7e1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetScan.scala @@ -25,7 +25,7 @@ import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.flink.api.java.DataSet import org.apache.flink.table.api.BatchTableEnvironment -import org.apache.flink.table.plan.schema.DataSetTable +import org.apache.flink.table.plan.schema.{DataSetTable, RowSchema} import org.apache.flink.types.Row /** @@ -60,9 +60,11 @@ class DataSetScan( } override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { - val config = tableEnv.getConfig + val schema = new RowSchema(rowRelDataType) val inputDataSet: DataSet[Any] = dataSetTable.dataSet - convertToInternalRow(inputDataSet, dataSetTable, config) + val fieldIdxs = dataSetTable.fieldIndexes + val config = tableEnv.getConfig + convertToInternalRow(schema, inputDataSet, fieldIdxs, config, None) } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala index 9352efb..59b3120 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamScan.scala @@ -60,7 +60,8 @@ class DataStreamScan( val config = tableEnv.getConfig val inputDataStream: DataStream[Any] = dataStreamTable.dataStream - convertToInternalRow(schema, inputDataStream, dataStreamTable, config) + val fieldIdxs = dataStreamTable.fieldIndexes + convertToInternalRow(schema, inputDataStream, fieldIdxs, config, None) } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala index 4aca856..dd779de 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamScan.scala @@ -18,69 +18,111 @@ package org.apache.flink.table.plan.nodes.datastream +import org.apache.calcite.rex.RexNode +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.codegen.FunctionCodeGenerator +import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.types.Row -import org.apache.flink.table.plan.schema.FlinkTable import org.apache.flink.table.runtime.CRowOutputProcessRunner import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} - -import scala.collection.JavaConverters._ +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo trait StreamScan extends CommonScan[CRow] with DataStreamRel { protected def convertToInternalRow( schema: RowSchema, input: DataStream[Any], - flinkTable: FlinkTable[_], - config: TableConfig) - : DataStream[CRow] = { + fieldIdxs: Array[Int], + config: TableConfig, + rowtimeExpression: Option[RexNode]): DataStream[CRow] = { val inputType = input.getType - val internalType = CRowTypeInfo(schema.typeInfo) + val internalType = schema.typeInfo + val cRowType = CRowTypeInfo(internalType) - // conversion - if (needsConversion(input.getType, internalType)) { + val hasTimeIndicator = fieldIdxs.exists(f => + f == TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER || + f == TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER) - val generator = new FunctionCodeGenerator( - config, - false, - inputType, - None, - Some(flinkTable.fieldIndexes)) + if (input.getType == cRowType && !hasTimeIndicator) { + // input is already a CRow with correct type + input.asInstanceOf[DataStream[CRow]] - val conversion = generator.generateConverterResultExpression( - schema.typeInfo, - schema.fieldNames) + } else if (input.getType == internalType && !hasTimeIndicator) { + // input is already of correct type. Only need to wrap it as CRow + input.asInstanceOf[DataStream[Row]].map(new RichMapFunction[Row, CRow] { + @transient private var outCRow: CRow = null + override def open(parameters: Configuration): Unit = { + outCRow = new CRow(null, change = true) + } - val body = - s""" - |${conversion.code} - |${generator.collectorTerm}.collect(${conversion.resultTerm}); - |""".stripMargin + override def map(v: Row): CRow = { + outCRow.row = v + outCRow + } + }).returns(cRowType) - val function = generator.generateFunction( + } else { + // input needs to be converted and wrapped as CRow or time indicators need to be generated + + val function = generateConversionProcessFunction( + config, + inputType, + internalType, "DataStreamSourceConversion", - classOf[ProcessFunction[Any, Row]], - body, - schema.typeInfo) + schema.fieldNames, + fieldIdxs, + rowtimeExpression + ) val processFunc = new CRowOutputProcessRunner( function.name, function.code, - internalType) + cRowType) - val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" + val opName = s"from: (${schema.fieldNames.mkString(", ")})" - input.process(processFunc).name(opName).returns(internalType) - } - // no conversion necessary, forward - else { - input.asInstanceOf[DataStream[CRow]] + input.process(processFunc).name(opName).returns(cRowType) } } + + private def generateConversionProcessFunction( + config: TableConfig, + inputType: TypeInformation[Any], + outputType: TypeInformation[Row], + conversionOperatorName: String, + fieldNames: Seq[String], + inputFieldMapping: Array[Int], + rowtimeExpression: Option[RexNode]): GeneratedFunction[ProcessFunction[Any, Row], Row] = { + + val generator = new FunctionCodeGenerator( + config, + false, + inputType, + None, + Some(inputFieldMapping)) + + val conversion = generator.generateConverterResultExpression( + outputType, + fieldNames, + rowtimeExpression) + + val body = + s""" + |${conversion.code} + |${generator.collectorTerm}.collect(${conversion.resultTerm}); + |""".stripMargin + + generator.generateFunction( + "DataStreamSourceConversion", + classOf[ProcessFunction[Any, Row]], + body, + outputType) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 937f9c1..46acd6c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -22,28 +22,35 @@ import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rex.RexNode import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment} +import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks} +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan -import org.apache.flink.table.plan.schema.{RowSchema, StreamTableSourceTable} +import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.types.CRow -import org.apache.flink.table.sources.{StreamTableSource, TableSource} +import org.apache.flink.table.sources._ +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo /** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */ class StreamTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - tableSource: StreamTableSource[_]) - extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource) + tableSource: StreamTableSource[_], + selectedFields: Option[Array[Int]]) + extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource, selectedFields) with StreamScan { override def deriveRowType(): RelDataType = { - - StreamTableSourceTable.deriveRowTypeOfTableSource( - tableSource.asInstanceOf[StreamTableSource[_]], - cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]) + val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] + TableSourceUtil.getRelDataType( + tableSource, + selectedFields, + streaming = true, + flinkTypeFactory) } override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { @@ -56,20 +63,21 @@ class StreamTableSourceScan( cluster, traitSet, getTable, - tableSource + tableSource, + selectedFields ) } override def copy( traitSet: RelTraitSet, - newTableSource: TableSource[_]) - : PhysicalTableSourceScan = { + newTableSource: TableSource[_]): PhysicalTableSourceScan = { new StreamTableSourceScan( cluster, traitSet, getTable, - newTableSource.asInstanceOf[StreamTableSource[_]] + newTableSource.asInstanceOf[StreamTableSource[_]], + selectedFields ) } @@ -77,12 +85,101 @@ class StreamTableSourceScan( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig): DataStream[CRow] = { + val fieldIndexes = TableSourceUtil.computeIndexMapping( + tableSource, + isStreamTable = true, + selectedFields) + val config = tableEnv.getConfig val inputDataStream = tableSource.getDataStream(tableEnv.execEnv).asInstanceOf[DataStream[Any]] - convertToInternalRow( - new RowSchema(getRowType), + val outputSchema = new RowSchema(this.getRowType) + + // check that declared and actual type of table source DataStream are identical + if (inputDataStream.getType != tableSource.getReturnType) { + throw new TableException(s"TableSource of type ${tableSource.getClass.getCanonicalName} " + + s"returned a DataStream of type ${inputDataStream.getType} that does not match with the " + + s"type ${tableSource.getReturnType} declared by the TableSource.getReturnType() method. " + + s"Please validate the implementation of the TableSource.") + } + + // get expression to extract rowtime attribute + val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + tableSource, + selectedFields, + cluster, + tableEnv.getRelBuilder, + TimeIndicatorTypeInfo.ROWTIME_INDICATOR + ) + + // ingest table and convert and extract time attributes if necessary + val ingestedTable = convertToInternalRow( + outputSchema, inputDataStream, - new StreamTableSourceTable(tableSource), - config) + fieldIndexes, + config, + rowtimeExpression) + + // generate watermarks for rowtime indicator + val rowtimeDesc: Option[RowtimeAttributeDescriptor] = + TableSourceUtil.getRowtimeAttributeDescriptor(tableSource, selectedFields) + + val withWatermarks = if (rowtimeDesc.isDefined) { + val rowtimeFieldIdx = outputSchema.fieldNames.indexOf(rowtimeDesc.get.getAttributeName) + val watermarkStrategy = rowtimeDesc.get.getWatermarkStrategy + watermarkStrategy match { + case p: PeriodicWatermarkAssigner => + val watermarkGenerator = new PeriodicWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) + case p: PunctuatedWatermarkAssigner => + val watermarkGenerator = new PunctuatedWatermarkAssignerWrapper(rowtimeFieldIdx, p) + ingestedTable.assignTimestampsAndWatermarks(watermarkGenerator) + } + } else { + // No need to generate watermarks if no rowtime attribute is specified. + ingestedTable + } + + withWatermarks + } +} + +/** + * Generates periodic watermarks based on a [[PeriodicWatermarkAssigner]]. + * + * @param timeFieldIdx the index of the rowtime attribute. + * @param assigner the watermark assigner. + */ +private class PeriodicWatermarkAssignerWrapper( + timeFieldIdx: Int, + assigner: PeriodicWatermarkAssigner) + extends AssignerWithPeriodicWatermarks[CRow] { + + override def getCurrentWatermark: Watermark = assigner.getWatermark + + override def extractTimestamp(crow: CRow, previousElementTimestamp: Long): Long = { + val timestamp: Long = crow.row.getField(timeFieldIdx).asInstanceOf[Long] + assigner.nextTimestamp(timestamp) + 0L + } +} + +/** + * Generates periodic watermarks based on a [[PunctuatedWatermarkAssigner]]. + * + * @param timeFieldIdx the index of the rowtime attribute. + * @param assigner the watermark assigner. + */ +private class PunctuatedWatermarkAssignerWrapper( + timeFieldIdx: Int, + assigner: PunctuatedWatermarkAssigner) + extends AssignerWithPunctuatedWatermarks[CRow] { + + override def checkAndGetNextWatermark(crow: CRow, ts: Long): Watermark = { + val timestamp: Long = crow.row.getField(timeFieldIdx).asInstanceOf[Long] + assigner.getWatermark(crow.row, timestamp) + } + + override def extractTimestamp(element: CRow, previousElementTimestamp: Long): Long = { + 0L } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala index b009cf1..337025f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala @@ -25,11 +25,11 @@ import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rel.logical.LogicalTableScan import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter} -import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.FlinkConventions -import org.apache.flink.table.plan.schema.{StreamTableSourceTable, TableSourceTable} -import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource, TableSource} +import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable, TableSourceTable} +import org.apache.flink.table.sources.{TableSource, TableSourceUtil} import scala.collection.JavaConverters._ @@ -37,27 +37,27 @@ class FlinkLogicalTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - val tableSource: TableSource[_]) + val tableSource: TableSource[_], + val selectedFields: Option[Array[Int]]) extends TableScan(cluster, traitSet, table) with FlinkLogicalRel { - def copy(traitSet: RelTraitSet, tableSource: TableSource[_]): FlinkLogicalTableSourceScan = { - new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource) + def copy( + traitSet: RelTraitSet, + tableSource: TableSource[_], + selectedFields: Option[Array[Int]]): FlinkLogicalTableSourceScan = { + new FlinkLogicalTableSourceScan(cluster, traitSet, getTable, tableSource, selectedFields) } override def deriveRowType(): RelDataType = { - val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory] - - tableSource match { - case s: StreamTableSource[_] => - StreamTableSourceTable.deriveRowTypeOfTableSource(s, flinkTypeFactory) - case _: BatchTableSource[_] => - val fieldNames = TableEnvironment.getFieldNames(tableSource).toList - val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList - flinkTypeFactory.buildLogicalRowType(fieldNames, fieldTypes) - case _ => throw new TableException("Unknown TableSource type.") + val streamingTable = table.unwrap(classOf[TableSourceTable[_]]) match { + case _: StreamTableSourceTable[_] => true + case _: BatchTableSourceTable[_] => false + case t => throw TableException(s"Unknown Table type ${t.getClass}.") } + + TableSourceUtil.getRelDataType(tableSource, selectedFields, streamingTable, flinkTypeFactory) } override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { @@ -67,7 +67,7 @@ class FlinkLogicalTableSourceScan( override def explainTerms(pw: RelWriter): RelWriter = { val terms = super.explainTerms(pw) - .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) + .item("fields", tableSource.getTableSchema.getColumnNames.mkString(", ")) val sourceDesc = tableSource.explainSource() if (sourceDesc.nonEmpty) { @@ -115,7 +115,8 @@ class FlinkLogicalTableSourceScanConverter rel.getCluster, traitSet, scan.getTable, - tableSource + tableSource, + None ) } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala index 649433e..32ca127 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala @@ -59,7 +59,8 @@ class BatchTableSourceScanRule rel.getCluster, traitSet, scan.getTable, - scan.tableSource.asInstanceOf[BatchTableSource[_]] + scan.tableSource.asInstanceOf[BatchTableSource[_]], + scan.selectedFields ) } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala index 10cb1f7..fca1557 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/StreamTableSourceScanRule.scala @@ -61,7 +61,8 @@ class StreamTableSourceScanRule rel.getCluster, traitSet, scan.getTable, - scan.tableSource.asInstanceOf[StreamTableSource[_]] + scan.tableSource.asInstanceOf[StreamTableSource[_]], + scan.selectedFields ) } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala index ae6129e..e87fa35 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala @@ -99,7 +99,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule( // check whether we still need a RexProgram. An RexProgram is needed when either // projection or filter exists. - val newScan = scan.copy(scan.getTraitSet, newTableSource) + val newScan = scan.copy(scan.getTraitSet, newTableSource, scan.selectedFields) val newRexProgram = { if (remainingCondition != null || !program.projectsOnlyIdentity) { val expandedProjectList = program.getProjectList.asScala http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala index 503badc..3ea97ab 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/PushProjectIntoTableSourceScanRule.scala @@ -20,9 +20,10 @@ package org.apache.flink.table.plan.rules.logical import org.apache.calcite.plan.RelOptRule.{none, operand} import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.flink.table.api.TableException import org.apache.flink.table.plan.util.{RexProgramExtractor, RexProgramRewriter} import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableSourceScan} -import org.apache.flink.table.sources.{DefinedProctimeAttribute, DefinedRowtimeAttribute, NestedFieldsProjectableTableSource, ProjectableTableSource} +import org.apache.flink.table.sources._ class PushProjectIntoTableSourceScanRule extends RelOptRule( operand(classOf[FlinkLogicalCalc], @@ -31,39 +32,50 @@ class PushProjectIntoTableSourceScanRule extends RelOptRule( override def matches(call: RelOptRuleCall): Boolean = { val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] - scan.tableSource match { - // projection pushdown is not supported for sources that provide time indicators - case r: DefinedRowtimeAttribute if r.getRowtimeAttribute != null => false - case p: DefinedProctimeAttribute if p.getProctimeAttribute != null => false - case _: ProjectableTableSource[_] => true - case _ => false - } + + // only continue if we haven't pushed down a projection yet. + scan.selectedFields.isEmpty } override def onMatch(call: RelOptRuleCall) { val calc: FlinkLogicalCalc = call.rel(0).asInstanceOf[FlinkLogicalCalc] val scan: FlinkLogicalTableSourceScan = call.rel(1).asInstanceOf[FlinkLogicalTableSourceScan] - val usedFields = RexProgramExtractor.extractRefInputFields(calc.getProgram) - - // if no fields can be projected, we keep the original plan. val source = scan.tableSource - if (scan.getRowType.getFieldCount != usedFields.length) { + val accessedLogicalFields = RexProgramExtractor.extractRefInputFields(calc.getProgram) + val accessedPhysicalFields = TableSourceUtil.getPhysicalIndexes(source, accessedLogicalFields) + + // only continue if fields are projected or reordered. + // eager reordering can remove a calc operator. + if (!(0 until scan.getRowType.getFieldCount).toArray.sameElements(accessedLogicalFields)) { + + // try to push projection of physical fields to TableSource val newTableSource = source match { case nested: NestedFieldsProjectableTableSource[_] => val nestedFields = RexProgramExtractor - .extractRefNestedInputFields(calc.getProgram, usedFields) - nested.projectNestedFields(usedFields, nestedFields) + .extractRefNestedInputFields(calc.getProgram, accessedPhysicalFields) + nested.projectNestedFields(accessedPhysicalFields, nestedFields) case projecting: ProjectableTableSource[_] => - projecting.projectFields(usedFields) + projecting.projectFields(accessedPhysicalFields) + case nonProjecting: TableSource[_] => + // projection cannot be pushed to TableSource + nonProjecting + } + + // check that table schema of the new table source is identical to original + if (source.getTableSchema != newTableSource.getTableSchema) { + throw new TableException("TableSchema of ProjectableTableSource must not be modified " + + "by projectFields() call. This is a bug in the implementation of the TableSource " + + s"${source.getClass.getCanonicalName}.") } - val newScan = scan.copy(scan.getTraitSet, newTableSource) + // Apply the projection during the input conversion of the scan. + val newScan = scan.copy(scan.getTraitSet, newTableSource, Some(accessedLogicalFields)) val newCalcProgram = RexProgramRewriter.rewriteWithFieldProjection( calc.getProgram, newScan.getRowType, calc.getCluster.getRexBuilder, - usedFields) + accessedLogicalFields) if (newCalcProgram.isTrivial) { // drop calc if the transformed program merely returns its input and doesn't exist filter
