This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4946cf4d6ab0f502f065825e157c7cabde604fe1 Author: Jark Wu <[email protected]> AuthorDate: Mon Jun 10 17:34:38 2019 +0800 [FLINK-12708][table] Introduce InputFormatTableSource and make blink&flink planner support it 1. Add a isBounded() interface to StreamTableSource, default returns false 2. Introduce InputFormatTableSource extends StreamTableSource and isBounded always returns true 3. InputFormatTableSource only exposes getInputFormat() interface --- .../flink/table/sources/BatchTableSource.java | 3 + ...ableSource.java => InputFormatTableSource.java} | 30 +++++++--- .../flink/table/sources/StreamTableSource.java | 8 +++ flink-table/flink-table-planner-blink/pom.xml | 6 ++ .../flink/table/api/BatchTableEnvironment.scala | 69 ++++++++++++---------- .../flink/table/api/StreamTableEnvironment.scala | 67 +++++++++++---------- .../physical/batch/BatchExecTableSourceScan.scala | 13 ++-- .../batch/BatchExecScanTableSourceRule.scala | 6 +- .../table/plan/schema/BatchTableSourceTable.scala | 62 ------------------- .../table/plan/schema/StreamTableSourceTable.scala | 62 ------------------- .../table/plan/schema/TableSourceSinkTable.scala | 6 +- .../flink/table/plan/schema/TableSourceTable.scala | 34 +++++++++-- .../flink/table/sources/BatchTableSource.scala | 38 ------------ .../flink/table/sources/StreamTableSource.scala | 38 ------------ .../plan/stream/sql/join/LookupJoinTest.scala | 14 +---- .../table/runtime/batch/sql/TableScanITCase.scala | 13 ++-- .../batch/sql/agg/WindowAggregateITCase.scala | 20 +++---- .../table/runtime/stream/sql/TableScanITCase.scala | 2 +- .../utils/InMemoryLookupableTableSource.scala | 5 -- .../apache/flink/table/util/TableTestBase.scala | 43 ++++++-------- .../apache/flink/table/util/testTableSources.scala | 10 ++-- .../apache/flink/table/api/BatchTableEnvImpl.scala | 16 +++-- .../flink/table/api/StreamTableEnvImpl.scala | 6 +- .../plan/nodes/dataset/BatchTableSourceScan.scala | 23 +++++++- .../rules/dataSet/BatchTableSourceScanRule.scala | 2 +- .../api/validation/TableSourceValidationTest.scala | 23 +++++++- .../runtime/batch/table/TableSourceITCase.scala | 33 +++++++++++ .../flink/table/utils/testTableSources.scala | 24 +++++++- 28 files changed, 306 insertions(+), 370 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/BatchTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/BatchTableSource.java index 75b421e..c266d87 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/BatchTableSource.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/BatchTableSource.java @@ -24,7 +24,10 @@ import org.apache.flink.api.java.ExecutionEnvironment; /** Defines an external batch table and provides access to its data. * * @param <T> Type of the {@link DataSet} created by this {@link TableSource}. + * + * @deprecated use {@link InputFormatTableSource} instead. */ +@Deprecated public interface BatchTableSource<T> extends TableSource<T> { /** diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/InputFormatTableSource.java similarity index 56% copy from flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java copy to flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/InputFormatTableSource.java index 67ed55f..79fc55a 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/InputFormatTableSource.java @@ -18,20 +18,34 @@ package org.apache.flink.table.sources; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -/** Defines an external stream table and provides read access to its data. +/** + * Defines an external bounded table and provides access to its data. * - * @param <T> Type of the {@link DataStream} created by this {@link TableSource}. + * @param <T> Type of the bounded {@link InputFormat} created by this {@link TableSource}. */ -public interface StreamTableSource<T> extends TableSource<T> { +@Experimental +public abstract class InputFormatTableSource<T> implements StreamTableSource<T> { /** - * Returns the data of the table as a {@link DataStream}. - * - *<p>NOTE: This method is for internal use only for defining a {@link TableSource}. - * Do not use it in Table API programs. + * Returns an {@link InputFormat} for reading the data of the table. */ - DataStream<T> getDataStream(StreamExecutionEnvironment execEnv); + public abstract InputFormat<T, ?> getInputFormat(); + + /** + * Always returns true which indicates this is a bounded source. + */ + @Override + public final boolean isBounded() { + return true; + } + + @Override + public final DataStream<T> getDataStream(StreamExecutionEnvironment execEnv) { + return execEnv.createInput(getInputFormat(), getReturnType()); + } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java index 67ed55f..4ec5dac 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sources/StreamTableSource.java @@ -28,6 +28,14 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public interface StreamTableSource<T> extends TableSource<T> { /** + * Returns true if this is a bounded source, false if this is an unbounded source. + * Default is unbounded for compatibility. + */ + default boolean isBounded() { + return false; + } + + /** * Returns the data of the table as a {@link DataStream}. * *<p>NOTE: This method is for internal use only for defining a {@link TableSource}. diff --git a/flink-table/flink-table-planner-blink/pom.xml b/flink-table/flink-table-planner-blink/pom.xml index 7715207..888c34c 100644 --- a/flink-table/flink-table-planner-blink/pom.xml +++ b/flink-table/flink-table-planner-blink/pom.xml @@ -96,6 +96,12 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 79ad59c..b8cfae7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -30,7 +30,7 @@ import org.apache.flink.table.plan.nodes.process.DAGProcessContext import org.apache.flink.table.plan.nodes.resource.batch.parallelism.BatchParallelismProcessor import org.apache.flink.table.plan.optimize.{BatchCommonSubGraphBasedOptimizer, Optimizer} import org.apache.flink.table.plan.reuse.DeadlockBreakupProcessor -import org.apache.flink.table.plan.schema.{BatchTableSourceTable, TableSourceSinkTable, TableSourceTable} +import org.apache.flink.table.plan.schema.{TableSourceSinkTable, TableSourceTable} import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.plan.util.{ExecNodePlanDumper, FlinkRelOptUtil} import org.apache.flink.table.sinks._ @@ -252,8 +252,8 @@ class BatchTableEnvironment( } /** - * Registers an internal [[BatchTableSource]] in this [[TableEnvironment]]'s catalog without - * name checking. Registered tables can be referenced in SQL queries. + * Registers an internal bounded [[StreamTableSource]] in this [[TableEnvironment]]'s catalog + * without name checking. Registered tables can be referenced in SQL queries. * * @param name The name under which the [[TableSource]] is registered. * @param tableSource The [[TableSource]] to register. @@ -265,40 +265,47 @@ class BatchTableEnvironment( statistic: FlinkStatistic, replace: Boolean = false): Unit = { - tableSource match { + def register(): Unit = { + // check if a table (source or sink) is registered + getTable(name) match { + // table source and/or sink is registered + case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match { - // check for proper batch table source - case batchTableSource: BatchTableSource[_] => - // check if a table (source or sink) is registered - getTable(name) match { - - // table source and/or sink is registered - case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match { - - // wrapper contains source - case Some(_: TableSourceTable[_]) if !replace => - throw new TableException(s"Table '$name' already exists. " + - s"Please choose a different name.") - - // wrapper contains only sink (not source) - case _ => - val enrichedTable = new TableSourceSinkTable( - Some(new BatchTableSourceTable(batchTableSource, statistic)), - table.tableSinkTable) - replaceRegisteredTable(name, enrichedTable) - } - - // no table is registered + // wrapper contains source + case Some(_: TableSourceTable[_]) if !replace => + throw new TableException(s"Table '$name' already exists. " + + s"Please choose a different name.") + + // wrapper contains only sink (not source) case _ => - val newTable = new TableSourceSinkTable( - Some(new BatchTableSourceTable(batchTableSource, statistic)), - None) - registerTableInternal(name, newTable) + val enrichedTable = new TableSourceSinkTable( + Some(new TableSourceTable(tableSource, false, statistic)), + table.tableSinkTable) + replaceRegisteredTable(name, enrichedTable) } + // no table is registered + case _ => + val newTable = new TableSourceSinkTable( + Some(new TableSourceTable(tableSource, false, statistic)), + None) + registerTableInternal(name, newTable) + } + } + + tableSource match { + + // check for proper batch table source + case boundedTableSource: StreamTableSource[_] if boundedTableSource.isBounded => + register() + + // a lookupable table source can also be registered in the env + case _: LookupableTableSource[_] => + register() + // not a batch table source case _ => - throw new TableException("Only BatchTableSource can be " + + throw new TableException("Only LookupableTableSouce and BatchTableSource can be " + "registered in BatchTableEnvironment.") } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 2f31ef3..c62ff18 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -37,13 +37,12 @@ import org.apache.flink.table.plan.schema._ import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.plan.util.{ExecNodePlanDumper, FlinkRelOptUtil} import org.apache.flink.table.sinks.DataStreamTableSink -import org.apache.flink.table.sources.{StreamTableSource, TableSource} +import org.apache.flink.table.sources.{LookupableTableSource, StreamTableSource, TableSource} import org.apache.flink.table.types.{DataType, LogicalTypeDataTypeConverter} import org.apache.flink.table.types.logical.{LogicalType, RowType} import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils} import org.apache.flink.table.util.PlanUtil - import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef} import org.apache.calcite.sql.SqlExplainLevel @@ -507,41 +506,49 @@ abstract class StreamTableEnvironment( // case _ => // ok //} + def register(): Unit = { + // register + getTable(name) match { + + // check if a table (source or sink) is registered + case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match { + + // wrapper contains source + case Some(_: TableSourceTable[_]) if !replace => + throw new TableException(s"Table '$name' already exists. " + + s"Please choose a different name.") + + // wrapper contains only sink (not source) + case Some(_: TableSourceTable[_]) => + val enrichedTable = new TableSourceSinkTable( + Some(new TableSourceTable(tableSource, true, statistic)), + table.tableSinkTable) + replaceRegisteredTable(name, enrichedTable) + } + + // no table is registered + case _ => + val newTable = new TableSourceSinkTable( + Some(new TableSourceTable(tableSource, true, statistic)), + None) + registerTableInternal(name, newTable) + } + } + tableSource match { // check for proper stream table source - case streamTableSource: StreamTableSource[_] => - // register - getTable(name) match { - - // check if a table (source or sink) is registered - case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match { - - // wrapper contains source - case Some(_: TableSourceTable[_]) if !replace => - throw new TableException(s"Table '$name' already exists. " + - s"Please choose a different name.") - - // wrapper contains only sink (not source) - case Some(_: StreamTableSourceTable[_]) => - val enrichedTable = new TableSourceSinkTable( - Some(new StreamTableSourceTable(streamTableSource)), - table.tableSinkTable) - replaceRegisteredTable(name, enrichedTable) - } + case streamTableSource: StreamTableSource[_] if !streamTableSource.isBounded => + register() - // no table is registered - case _ => - val newTable = new TableSourceSinkTable( - Some(new StreamTableSourceTable(streamTableSource)), - None) - registerTableInternal(name, newTable) - } + // a lookupable table source can also be registered in the env + case _: LookupableTableSource[_] => + register() // not a stream table source case _ => - throw new TableException( - "Only StreamTableSource can be registered in StreamTableEnvironment") + throw new TableException("Only LookupableTableSource and unbounded StreamTableSource " + + "can be registered in StreamTableEnvironment") } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala index ce0a826..6ed27dd 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecTableSourceScan.scala @@ -34,7 +34,7 @@ import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode} import org.apache.flink.table.plan.nodes.physical.PhysicalTableSourceScan import org.apache.flink.table.plan.schema.FlinkRelOptTable import org.apache.flink.table.plan.util.ScanUtil -import org.apache.flink.table.sources.{BatchTableSource, TableSourceUtil} +import org.apache.flink.table.sources.{StreamTableSource, TableSourceUtil} import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode @@ -47,7 +47,8 @@ import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataTy import scala.collection.JavaConversions._ /** - * Batch physical RelNode to read data from an external source defined by a [[BatchTableSource]]. + * Batch physical RelNode to read data from an external source defined by a + * bounded [[StreamTableSource]]. */ class BatchExecTableSourceScan( cluster: RelOptCluster, @@ -87,8 +88,8 @@ class BatchExecTableSourceScan( override def translateToPlanInternal( tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = { val config = tableEnv.getConfig - val bts = tableSource.asInstanceOf[BatchTableSource[_]] - val inputTransform = bts.getBoundedStream(tableEnv.execEnv).getTransformation + val bts = tableSource.asInstanceOf[StreamTableSource[_]] // bounded table source + val inputTransform = bts.getDataStream(tableEnv.execEnv).getTransformation inputTransform.setParallelism(getResource.getParallelism) val fieldIndexes = TableSourceUtil.computeIndexMapping( @@ -141,13 +142,13 @@ class BatchExecTableSourceScan( ScanUtil.needsConversion( tableSource.getProducedDataType, TypeExtractor.createTypeInfo( - tableSource, classOf[BatchTableSource[_]], tableSource.getClass, 0) + tableSource, classOf[StreamTableSource[_]], tableSource.getClass, 0) .getTypeClass.asInstanceOf[Class[_]]) } def getSourceTransformation( streamEnv: StreamExecutionEnvironment): StreamTransformation[_] = { - tableSource.asInstanceOf[BatchTableSource[_]].getBoundedStream(streamEnv).getTransformation + tableSource.asInstanceOf[StreamTableSource[_]].getDataStream(streamEnv).getTransformation } def getEstimatedRowCount: lang.Double = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala index 8c3e698..3021c56 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecScanTableSourceRule.scala @@ -26,7 +26,7 @@ import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.nodes.logical.FlinkLogicalTableSourceScan import org.apache.flink.table.plan.nodes.physical.batch.BatchExecTableSourceScan import org.apache.flink.table.plan.schema.{FlinkRelOptTable, TableSourceTable} -import org.apache.flink.table.sources.BatchTableSource +import org.apache.flink.table.sources.StreamTableSource /** * Rule that converts [[FlinkLogicalTableSourceScan]] to [[BatchExecTableSourceScan]]. @@ -38,14 +38,14 @@ class BatchExecScanTableSourceRule FlinkConventions.BATCH_PHYSICAL, "BatchExecScanTableSourceRule") { - /** Rule must only match if TableScan targets a [[BatchTableSource]] */ + /** Rule must only match if TableScan targets a bounded [[StreamTableSource]] */ override def matches(call: RelOptRuleCall): Boolean = { val scan: TableScan = call.rel(0).asInstanceOf[TableScan] val dataSetTable = scan.getTable.unwrap(classOf[TableSourceTable[_]]) dataSetTable match { case tst: TableSourceTable[_] => tst.tableSource match { - case _: BatchTableSource[_] => true + case sts: StreamTableSource[_] => sts.isBounded case _ => false } case _ => false diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala deleted file mode 100644 index d01d9e9..0000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/BatchTableSourceTable.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.plan.schema - -import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.{BatchTableSource, TableSource, TableSourceUtil} - -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} - -/** - * Class which implements the logic to convert a [[BatchTableSourceTable]] to Calcite Table - */ -class BatchTableSourceTable[T]( - tableSource: BatchTableSource[T], - statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) - extends TableSourceTable(tableSource, statistic) { - - // TODO implements this - // TableSourceUtil.validateTableSource(tableSource) - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { - TableSourceUtil.getRelDataType( - tableSource, - None, - streaming = false, - typeFactory.asInstanceOf[FlinkTypeFactory]) - } - - /** - * Creates a copy of this table, changing statistic. - * - * @param statistic A new FlinkStatistic. - * @return Copy of this table, substituting statistic. - */ - override def copy(statistic: FlinkStatistic) = new BatchTableSourceTable(tableSource, statistic) - - /** - * replace table source with the given one, and create a new table source table. - * - * @param tableSource tableSource to replace. - * @return new TableSourceTable - */ - override def replaceTableSource(tableSource: TableSource[T]) = - new BatchTableSourceTable(tableSource.asInstanceOf[BatchTableSource[T]], statistic) -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala deleted file mode 100644 index 1dc9ae0..0000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.plan.schema - -import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceUtil} - -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} - -/** - * Class which implements the logic to convert a [[StreamTableSource]] to Calcite Table - */ -class StreamTableSourceTable[T]( - tableSource: StreamTableSource[T], - statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) - extends TableSourceTable(tableSource, statistic) { - - // TODO implements this - // TableSourceUtil.validateTableSource(tableSource) - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { - TableSourceUtil.getRelDataType( - tableSource, - None, - streaming = true, - typeFactory.asInstanceOf[FlinkTypeFactory]) - } - - /** - * Creates a copy of this table, changing statistic. - * - * @param statistic A new FlinkStatistic. - * @return Copy of this table, substituting statistic. - */ - override def copy(statistic: FlinkStatistic) = new StreamTableSourceTable(tableSource, statistic) - - /** - * replace table source with the given one, and create a new table source table. - * - * @param tableSource tableSource to replace. - * @return new TableSourceTable - */ - override def replaceTableSource(tableSource: TableSource[T]) = - new StreamTableSourceTable(tableSource.asInstanceOf[StreamTableSource[T]], statistic) -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala index 0ed343f..c421d78 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceSinkTable.scala @@ -56,18 +56,18 @@ class TableSourceSinkTable[T1, T2]( def isSourceTable: Boolean = tableSourceTable.isDefined def isStreamSourceTable: Boolean = tableSourceTable match { - case Some(_: StreamTableSourceTable[_]) => true + case Some(tst) => tst.isStreaming case _ => false } def isBatchSourceTable: Boolean = tableSourceTable match { - case Some(_: BatchTableSourceTable[_]) => true + case Some(tst) => !tst.isStreaming case _ => false } override def copy(statistic: FlinkStatistic): FlinkTable = { new TableSourceSinkTable[T1, T2]( - tableSourceTable.map(source => source.copy(statistic).asInstanceOf[TableSourceTable[T1]]), + tableSourceTable.map(source => source.copy(statistic)), tableSinkTable.map(sink => sink.copy(statistic).asInstanceOf[TableSinkTable[T2]])) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index fb6e3be..cafb2e9 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -18,18 +18,42 @@ package org.apache.flink.table.plan.schema +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.stats.FlinkStatistic -import org.apache.flink.table.sources.TableSource +import org.apache.flink.table.sources.{TableSource, TableSourceUtil} /** * Abstract class which define the interfaces required to convert a [[TableSource]] to * a Calcite Table */ -abstract class TableSourceTable[T]( +class TableSourceTable[T]( val tableSource: TableSource[T], - val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN) + val isStreaming: Boolean, + val statistic: FlinkStatistic) extends FlinkTable { + // TODO implements this + // TableSourceUtil.validateTableSource(tableSource) + + override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { + TableSourceUtil.getRelDataType( + tableSource, + None, + streaming = false, + typeFactory.asInstanceOf[FlinkTypeFactory]) + } + + /** + * Creates a copy of this table, changing statistic. + * + * @param statistic A new FlinkStatistic. + * @return Copy of this table, substituting statistic. + */ + override def copy(statistic: FlinkStatistic): TableSourceTable[T] = { + new TableSourceTable(tableSource, isStreaming, statistic) + } + /** * Returns statistics of current table. */ @@ -41,5 +65,7 @@ abstract class TableSourceTable[T]( * @param tableSource tableSource to replace. * @return new TableSourceTable */ - def replaceTableSource(tableSource: TableSource[T]): TableSourceTable[T] + def replaceTableSource(tableSource: TableSource[T]): TableSourceTable[T] = { + new TableSourceTable(tableSource, isStreaming, statistic) + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala deleted file mode 100644 index bdd5641..0000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/BatchTableSource.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sources - -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment - -/** - * Defines an external batch exec table and provides access to its data. - * - * @tparam T Type of the [[DataStream]] created by this [[TableSource]]. - */ -trait BatchTableSource[T] extends TableSource[T] { - - /** - * Returns the data of the table as a [[DataStream]]. - * - * NOTE: This method is for internal use only for defining a [[TableSource]]. - * Do not use it in Table API programs. - */ - def getBoundedStream(streamEnv: StreamExecutionEnvironment): DataStream[T] -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala deleted file mode 100644 index 07d7fa0..0000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sources/StreamTableSource.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sources - -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment - -/** - * Defines an external stream table and provides access to its data. - * - * @tparam T Type of the [[DataStream]] created by this [[TableSource]]. - */ -trait StreamTableSource[T] extends TableSource[T] { - - /** - * Returns the data of the table as a [[DataStream]]. - * - * NOTE: This method is for internal use only for defining a [[TableSource]]. - * Do not use it in Table API programs. - */ - def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] -} diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala index 2eb1cb1..9e7ce7b 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala @@ -342,9 +342,7 @@ class LookupJoinTest extends TableTestBase with Serializable { class TestTemporalTable - extends StreamTableSource[BaseRow] - with BatchTableSource[BaseRow] - with LookupableTableSource[BaseRow] + extends LookupableTableSource[BaseRow] with DefinedIndexes { val fieldNames: Array[String] = Array("id", "name", "age") @@ -381,16 +379,6 @@ class TestTemporalTable .build() util.Arrays.asList(index1, index2) } - - override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[BaseRow] = { - throw new UnsupportedOperationException("This TableSource is only used for unit test, " + - "this method should never be called.") - } - - override def getBoundedStream(streamEnv: StreamExecutionEnvironment): DataStream[BaseRow] = { - throw new UnsupportedOperationException("This TableSource is only used for unit test, " + - "this method should never be called.") - } } class TestInvalidTemporalTable private( diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala index e32fce4..eab3a22 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/TableScanITCase.scala @@ -25,10 +25,9 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.{TableSchema, Types} import org.apache.flink.table.runtime.utils.BatchTestBase import org.apache.flink.table.runtime.utils.BatchTestBase.row -import org.apache.flink.table.sources.BatchTableSource +import org.apache.flink.table.sources.StreamTableSource import org.apache.flink.table.util.TestTableSourceWithTime import org.apache.flink.types.Row - import org.junit.Test import java.lang.{Integer => JInt, Long => JLong} @@ -42,11 +41,13 @@ class TableScanITCase extends BatchTestBase { def testTableSourceWithoutTimeAttribute(): Unit = { val tableName = "MyTable" - val tableSource = new BatchTableSource[Row]() { + val tableSource = new StreamTableSource[Row]() { private val fieldNames: Array[String] = Array("name", "id", "value") private val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG, Types.INT) - override def getBoundedStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { + override def isBounded: Boolean = true + + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { val data = Seq( row("Mary", new JLong(1L), new JInt(1)), row("Bob", new JLong(2L), new JInt(3)) @@ -77,7 +78,7 @@ class TableScanITCase extends BatchTestBase { val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.SQL_TIMESTAMP)) val returnType = Types.STRING - val tableSource = new TestTableSourceWithTime(schema, returnType, data, null, "ptime") + val tableSource = new TestTableSourceWithTime(true, schema, returnType, data, null, "ptime") tEnv.registerTableSource(tableName, tableSource) checkResult( @@ -105,7 +106,7 @@ class TableScanITCase extends BatchTestBase { Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT).asInstanceOf[Array[TypeInformation[_]]], fieldNames) - val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", null) + val tableSource = new TestTableSourceWithTime(true, schema, rowType, data, "rtime", null) tEnv.registerTableSource(tableName, tableSource) checkResult( diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/WindowAggregateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/WindowAggregateITCase.scala index c399b1b..13a2ae7 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/WindowAggregateITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/agg/WindowAggregateITCase.scala @@ -18,25 +18,24 @@ package org.apache.flink.table.runtime.batch.sql.agg +import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO} import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo.TIMESTAMP import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.io.CollectionInputFormat import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.core.io.InputSplit import scala.collection.JavaConversions._ import org.apache.flink.table.api.{TableSchema, _} import org.apache.flink.table.runtime.utils.BatchTestBase import org.apache.flink.table.runtime.utils.BatchTestBase.row import org.apache.flink.table.runtime.utils.TestData._ -import org.apache.flink.table.sources.BatchTableSource +import org.apache.flink.table.sources.InputFormatTableSource import org.apache.flink.table.util.DateTimeTestUtil.UTCTimestamp import org.apache.flink.table.util.{CountAggFunction, IntAvgAggFunction, IntSumAggFunction} import org.apache.flink.types.Row - import org.junit.{Before, Ignore, Test} class WindowAggregateITCase extends BatchTestBase { @@ -398,17 +397,14 @@ class WindowAggregateITCase extends BatchTestBase { // "a" -> new ColumnStats(10000000L, 1L, 8D, 8, 5, -5), // "b" -> new ColumnStats(8000000L, 0L, 4D, 32, 6.1D, 0D), // "c" -> new ColumnStats(9000000L, 0L, 1024D, 32, 6.1D, 0D)) - val table = new BatchTableSource[Row] { - override def getReturnType: TypeInformation[Row] = - new RowTypeInfo(tableSchema.getFieldTypes, tableSchema.getFieldNames) + val table = new InputFormatTableSource[Row] { + override def getReturnType: TypeInformation[Row] = type3WithTimestamp // override def getTableStats: TableStats = new TableStats(10000000L, colStats) - override def getBoundedStream(streamEnv: StreamExecutionEnvironment): DataStream[Row] = { - streamEnv.createInput( - new CollectionInputFormat[Row](data3WithTimestamp, - type3WithTimestamp.createSerializer(env.getConfig)), - type3WithTimestamp) + override def getInputFormat: InputFormat[Row, _ <: InputSplit] = { + new CollectionInputFormat[Row](data3WithTimestamp, + type3WithTimestamp.createSerializer(env.getConfig)) } override def getTableSchema: TableSchema = tableSchema diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala index 67a2f59..9970ffe 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/TableScanITCase.scala @@ -80,7 +80,7 @@ class TableScanITCase extends StreamingTestBase { val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.SQL_TIMESTAMP)) val returnType = Types.STRING - val tableSource = new TestTableSourceWithTime(schema, returnType, data, null, "ptime") + val tableSource = new TestTableSourceWithTime(false, schema, returnType, data, null, "ptime") tEnv.registerTableSource(tableName, tableSource) val sqlQuery = s"SELECT name FROM $tableName" diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala index 21cf7f8..1abcf2e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala @@ -54,7 +54,6 @@ class InMemoryLookupableTableSource( lookupConfig: LookupConfig) extends LookupableTableSource[Row] with StreamTableSource[Row] - with BatchTableSource[Row] with DefinedPrimaryKey with DefinedIndexes { @@ -124,10 +123,6 @@ class InMemoryLookupableTableSource( override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { throw new UnsupportedOperationException("This should never be called.") } - - override def getBoundedStream(streamEnv: StreamExecutionEnvironment): DataStream[Row] = { - throw new UnsupportedOperationException("This should never be called.") - } } object InMemoryLookupableTableSource { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala index 25fdc74..0d92a4d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala @@ -32,28 +32,28 @@ import org.apache.flink.table.dataformat.BaseRow import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction} import org.apache.flink.table.plan.nodes.exec.ExecNode import org.apache.flink.table.plan.optimize.program.{FlinkBatchProgram, FlinkStreamProgram} -import org.apache.flink.table.plan.schema.{BatchTableSourceTable, StreamTableSourceTable} +import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.plan.stats.FlinkStatistic import org.apache.flink.table.plan.util.{ExecNodePlanDumper, FlinkRelOptUtil} import org.apache.flink.table.runtime.utils.{BatchTableEnvUtil, TestingAppendTableSink, TestingRetractTableSink, TestingUpsertTableSink} -import org.apache.flink.table.sinks.{AppendStreamTableSink, CollectRowTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink} -import org.apache.flink.table.sources.{BatchTableSource, StreamTableSource} +import org.apache.flink.table.sinks._ +import org.apache.flink.table.sources.StreamTableSource import org.apache.flink.table.types.TypeInfoLogicalTypeConverter import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo import org.apache.flink.table.types.logical.LogicalType -import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType +import org.apache.flink.table.types.utils.TypeConversions import org.apache.flink.table.typeutils.BaseRowTypeInfo import org.apache.flink.types.Row import org.apache.calcite.rel.RelNode import org.apache.calcite.sql.SqlExplainLevel + import org.apache.commons.lang3.SystemUtils + import org.junit.Assert.{assertEquals, assertTrue} import org.junit.Rule import org.junit.rules.{ExpectedException, TestName} -import _root_.java.util.{Set => JSet} - import _root_.scala.collection.JavaConversions._ /** @@ -132,13 +132,10 @@ abstract class TableTestUtil(test: TableTestBase) { case at: AtomicType[_] => Array[TypeInformation[_]](at) case _ => throw new TableException(s"Unsupported type info: $typeInfo") } + val dataType = TypeConversions.fromLegacyInfoToDataType(typeInfo) val tableEnv = getTableEnv - val (fieldNames, _) = tableEnv.getFieldInfo( - fromLegacyInfoToDataType(typeInfo), fields.map(_.name).toArray) - val schema = new TableSchema(fieldNames, fieldTypes) - val tableSource = new TestTableSource(schema) - tableEnv.registerTableSource(name, tableSource) - tableEnv.scan(name) + val (fieldNames, _) = tableEnv.getFieldInfo(dataType, fields.map(_.name).toArray) + addTableSource(name, fieldTypes, fieldNames) } /** @@ -147,14 +144,14 @@ abstract class TableTestUtil(test: TableTestBase) { * * @param name table name * @param types field types - * @param names field names + * @param fields field names * @param statistic statistic of current table * @return returns the registered [[Table]]. */ def addTableSource( name: String, types: Array[TypeInformation[_]], - names: Array[String], + fields: Array[String], statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table /** @@ -499,8 +496,8 @@ case class StreamTableTestUtil(test: TableTestBase) extends TableTestUtil(test) statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table = { val tableEnv = getTableEnv val schema = new TableSchema(names, types) - val tableSource = new TestTableSource(schema) - val table = new StreamTableSourceTable[BaseRow](tableSource, statistic) + val tableSource = new TestTableSource(true, schema) + val table = new TableSourceTable[BaseRow](tableSource, true, statistic) tableEnv.registerTableInternal(name, table) tableEnv.scan(name) } @@ -612,8 +609,8 @@ case class BatchTableTestUtil(test: TableTestBase) extends TableTestUtil(test) { statistic: FlinkStatistic = FlinkStatistic.UNKNOWN): Table = { val tableEnv = getTableEnv val schema = new TableSchema(names, types) - val tableSource = new TestTableSource(schema) - val table = new BatchTableSourceTable[BaseRow](tableSource, statistic) + val tableSource = new TestTableSource(true, schema) + val table = new TableSourceTable[BaseRow](tableSource, false, statistic) tableEnv.registerTableInternal(name, table) tableEnv.scan(name) } @@ -647,14 +644,10 @@ case class BatchTableTestUtil(test: TableTestBase) extends TableTestUtil(test) { /** * Batch/Stream [[org.apache.flink.table.sources.TableSource]] for testing. */ -class TestTableSource(schema: TableSchema) - extends BatchTableSource[BaseRow] - with StreamTableSource[BaseRow] { +class TestTableSource(isBatch: Boolean, schema: TableSchema) + extends StreamTableSource[BaseRow] { - override def getBoundedStream( - streamEnv: environment.StreamExecutionEnvironment): DataStream[BaseRow] = { - streamEnv.fromCollection(List[BaseRow](), getReturnType) - } + override def isBounded: Boolean = isBatch override def getDataStream( execEnv: environment.StreamExecutionEnvironment): DataStream[BaseRow] = { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala index a6f744a..6852e93 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/testTableSources.scala @@ -33,6 +33,7 @@ import java.util.Collections import scala.collection.JavaConversions._ class TestTableSourceWithTime[T]( + isBatch: Boolean, tableSchema: TableSchema, returnType: TypeInformation[T], values: Seq[T], @@ -40,17 +41,14 @@ class TestTableSourceWithTime[T]( proctime: String = null, mapping: Map[String, String] = null) extends StreamTableSource[T] - with BatchTableSource[T] with DefinedRowtimeAttributes with DefinedProctimeAttribute with DefinedFieldMapping { - override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = { - execEnv.fromCollection(values, returnType) - } + override def isBounded: Boolean = isBatch - override def getBoundedStream(streamEnv: StreamExecutionEnvironment): DataStreamSource[T] = { - val dataStream = streamEnv.fromCollection(values, returnType) + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = { + val dataStream = execEnv.fromCollection(values, returnType) dataStream.getTransformation.setMaxParallelism(1) dataStream } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala index b178d7a..2e6401d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala @@ -40,9 +40,10 @@ import org.apache.flink.table.plan.rules.FlinkRuleSets import org.apache.flink.table.plan.schema._ import org.apache.flink.table.runtime.MapRunner import org.apache.flink.table.sinks._ -import org.apache.flink.table.sources.{BatchTableSource, TableSource, TableSourceUtil} +import org.apache.flink.table.sources.{BatchTableSource, InputFormatTableSource, TableSource, TableSourceUtil} import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo import org.apache.flink.table.typeutils.FieldInfoUtils.{calculateTableSchema, getFieldsInfo, validateInputTypeInfo} +import org.apache.flink.table.utils.TableConnectorUtils import org.apache.flink.types.Row /** @@ -68,15 +69,18 @@ abstract class BatchTableEnvImpl( override protected def validateTableSource(tableSource: TableSource[_]): Unit = { TableSourceUtil.validateTableSource(tableSource) - if (!tableSource.isInstanceOf[BatchTableSource[_]]) { - throw new TableException("Only BatchTableSource can be registered in " + - "BatchTableEnvironment.") + if (!tableSource.isInstanceOf[BatchTableSource[_]] && + !tableSource.isInstanceOf[InputFormatTableSource[_]]) { + throw new TableException("Only BatchTableSource and InputFormatTableSource can be registered " + + "in BatchTableEnvironment.") } } override protected def validateTableSink(configuredSink: TableSink[_]): Unit = { - if (!configuredSink.isInstanceOf[BatchTableSink[_]]) { - throw new TableException("Only BatchTableSink can be registered in BatchTableEnvironment.") + if (!configuredSink.isInstanceOf[BatchTableSink[_]] && + !configuredSink.isInstanceOf[BoundedTableSink[_]]) { + throw new TableException("Only BatchTableSink and BoundedTableSink can be registered " + + "in BatchTableEnvironment.") } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala index 69b8164..9078560 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala @@ -84,7 +84,7 @@ abstract class StreamTableEnvImpl( tableSource match { // check for proper stream table source - case streamTableSource: StreamTableSource[_] => + case streamTableSource: StreamTableSource[_] if !streamTableSource.isBounded => // check that event-time is enabled if table source includes rowtime attributes if (TableSourceUtil.hasRowtimeAttribute(streamTableSource) && execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) { @@ -93,6 +93,10 @@ abstract class StreamTableEnvImpl( s"environment. But is: ${execEnv.getStreamTimeCharacteristic}") } + case streamTableSource: StreamTableSource[_] if streamTableSource.isBounded => + throw new TableException("Only unbounded StreamTableSource (isBounded returns false) " + + "can be registered in StreamTableEnvironment") + // not a stream table source case _ => throw new TableException("Only StreamTableSource can be registered in " + diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala index 7f4a5fb..19e0bda 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -23,13 +23,16 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rex.RexNode +import org.apache.flink.api.common.io.InputFormat +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet +import org.apache.flink.core.io.InputSplit import org.apache.flink.table.api.{BatchQueryConfig, BatchTableEnvImpl, TableException, Types} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.sources._ -import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType +import org.apache.flink.table.types.utils.TypeConversions.{fromDataTypeToLegacyInfo, fromLegacyInfoToDataType} import org.apache.flink.types.Row /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */ @@ -37,7 +40,7 @@ class BatchTableSourceScan( cluster: RelOptCluster, traitSet: RelTraitSet, table: RelOptTable, - tableSource: BatchTableSource[_], + tableSource: TableSource[_], selectedFields: Option[Array[Int]]) extends PhysicalTableSourceScan(cluster, traitSet, table, tableSource, selectedFields) with BatchScan { @@ -89,7 +92,21 @@ class BatchTableSourceScan( selectedFields) val config = tableEnv.getConfig - val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] + val inputDataSet = tableSource match { + case batchSource: BatchTableSource[_] => + batchSource.getDataSet(tableEnv.execEnv) + case boundedSource: InputFormatTableSource[_] => + val resultType = fromDataTypeToLegacyInfo(boundedSource.getProducedDataType) + .asInstanceOf[TypeInformation[Any]] + val inputFormat = boundedSource.getInputFormat + .asInstanceOf[InputFormat[Any, _ <: InputSplit]] + tableEnv.execEnv + .createInput(inputFormat, resultType) + .name(boundedSource.explainSource) + .asInstanceOf[DataSet[_]] + case _ => throw new TableException("Only BatchTableSource and InputFormatTableSource are " + + "supported in BatchTableEnvironment.") + } val outputSchema = new RowSchema(this.getRowType) val inputDataType = fromLegacyInfoToDataType(inputDataSet.getType) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala index 8ce97ed..c51c99a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/dataSet/BatchTableSourceScanRule.scala @@ -50,7 +50,7 @@ class BatchTableSourceScanRule rel.getCluster, traitSet, scan.getTable, - scan.tableSource.asInstanceOf[BatchTableSource[_]], + scan.tableSource, scan.selectedFields ) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala index fa405d6..e0dc5c3 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/validation/TableSourceValidationTest.scala @@ -20,17 +20,16 @@ package org.apache.flink.table.api.validation import java.util import java.util.Collections - import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala.StreamTableEnvironment -import org.apache.flink.table.api.{TableSchema, Types, ValidationException} +import org.apache.flink.table.api.{TableException, TableSchema, Types, ValidationException} import org.apache.flink.table.sources._ import org.apache.flink.table.sources.tsextractors.ExistingField import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps -import org.apache.flink.table.utils.TestTableSourceWithTime +import org.apache.flink.table.utils.{TestInputFormatTableSource, TestTableSourceWithTime} import org.apache.flink.types.Row import org.junit.Test @@ -245,4 +244,22 @@ class TableSourceValidationTest { // should fail, field can be empty .build() } + + @Test(expected = classOf[TableException]) + def testBoundedTableSourceForStreamEnv(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = StreamTableEnvironment.create(env) + + val fieldNames = Array("id", "name") + val rowType = new RowTypeInfo( + Array(Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], + fieldNames) + val schema = new TableSchema( + fieldNames, + Array(Types.LONG, Types.STRING())) + val ts = new TestInputFormatTableSource(schema, rowType, Seq[Row]()) + + // should fail because InputFormatTableSource is not supported in StreamTableEnvironment + tEnv.registerTableSource("testTable", ts) + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala index b152dcb..eb9b31d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala @@ -73,6 +73,39 @@ class TableSourceITCase( } @Test + def testBoundedTableSource(): Unit = { + val tableName = "MyTable" + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = BatchTableEnvironment.create(env) + + val data = Seq( + Row.of("Mary", new JLong(1L), new JInt(10)), + Row.of("Bob", new JLong(2L), new JInt(20)), + Row.of("Mary", new JLong(2L), new JInt(30)), + Row.of("Liz", new JLong(2001L), new JInt(40))) + + val fieldNames = Array("name", "rtime", "amount") + val schema = new TableSchema(fieldNames, Array(Types.STRING, Types.LONG(), Types.INT)) + val rowType = new RowTypeInfo( + Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]], + fieldNames) + + val tableSource = new TestInputFormatTableSource(schema, rowType, data) + tEnv.registerTableSource(tableName, tableSource) + + val results = tEnv.scan(tableName) + .groupBy('name) + .select('name, 'amount.sum) + .collect() + + val expected = Seq( + "Mary,40", + "Bob,20", + "Liz,40").mkString("\n") + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test def testCsvTableSourceWithProjection(): Unit = { val csvTable = CommonTestData.getCsvTableSource diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/testTableSources.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/testTableSources.scala index bbc155b..6f66158 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/testTableSources.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/testTableSources.scala @@ -18,12 +18,13 @@ package org.apache.flink.table.utils -import java.util -import java.util.{Collections, List => JList} - +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.io.CollectionInputFormat import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.core.io.InputSplit import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.TableSchema @@ -33,6 +34,9 @@ import org.apache.flink.table.sources.tsextractors.ExistingField import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, PreserveWatermarks} import org.apache.flink.types.Row +import java.util +import java.util.Collections + import scala.collection.JavaConverters._ class TestTableSourceWithTime[T]( @@ -225,3 +229,17 @@ class TestPreserveWMTableSource[T]( override def getTableSchema: TableSchema = tableSchema } + +class TestInputFormatTableSource[T]( + tableSchema: TableSchema, + returnType: TypeInformation[T], + values: Seq[T]) extends InputFormatTableSource[T] { + + override def getInputFormat: InputFormat[T, _ <: InputSplit] = { + new CollectionInputFormat[T](values.asJava, returnType.createSerializer(new ExecutionConfig)) + } + + override def getReturnType: TypeInformation[T] = returnType + + override def getTableSchema: TableSchema = tableSchema +}
