This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 69d8816d164a106f8edf61a768569dafa5b0dc8d Author: JingsongLi <[email protected]> AuthorDate: Mon Feb 10 15:35:42 2020 +0800 [FLINK-15912][table-planner-blink] Support create table source/sink by context in blink planner --- .../flink/table/factories/TableFactoryUtil.java | 39 ++++++++++ .../table/planner/delegation/PlannerBase.scala | 16 ++-- .../planner/plan/schema/CatalogSourceTable.scala | 48 ++++++------ .../table/planner/plan/batch/sql/TableScanTest.xml | 8 +- .../utils/TestCollectionTableFactory.scala | 53 ++++--------- .../table/planner/plan/batch/sql/SinkTest.scala | 31 -------- .../planner/plan/common/TableFactoryTest.scala | 88 ++++++++++++++++++++++ .../plan/utils/TestContextTableFactory.scala | 69 +++++++++++++++++ 8 files changed, 245 insertions(+), 107 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java index 57ca4cf..e33d55c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java @@ -34,6 +34,34 @@ import java.util.Optional; public class TableFactoryUtil { /** + * Returns a table source matching the descriptor. + */ + @SuppressWarnings("unchecked") + public static <T> TableSource<T> findAndCreateTableSource(TableSourceFactory.Context context) { + try { + return TableFactoryService + .find(TableSourceFactory.class, context.getTable().toProperties()) + .createTableSource(context); + } catch (Throwable t) { + throw new TableException("findAndCreateTableSource failed.", t); + } + } + + /** + * Returns a table sink matching the context. + */ + @SuppressWarnings("unchecked") + public static <T> TableSink<T> findAndCreateTableSink(TableSinkFactory.Context context) { + try { + return TableFactoryService + .find(TableSinkFactory.class, context.getTable().toProperties()) + .createTableSink(context); + } catch (Throwable t) { + throw new TableException("findAndCreateTableSink failed.", t); + } + } + + /** * Returns a table source matching the properties. */ @SuppressWarnings("unchecked") @@ -86,4 +114,15 @@ public class TableFactoryUtil { return Optional.empty(); } + /** + * Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog. + */ + public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, TableSinkFactory.Context context) { + TableFactory tableFactory = catalog.getTableFactory().orElse(null); + if (tableFactory instanceof TableSinkFactory) { + return Optional.ofNullable(((TableSinkFactory) tableFactory).createTableSink(context)); + } + return Optional.empty(); + } + } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index ab084e5..47892a1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -25,7 +25,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException} import org.apache.flink.table.catalog._ import org.apache.flink.table.delegation.{Executor, Parser, Planner} -import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory} +import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactoryContextImpl} import org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode import org.apache.flink.table.operations._ import org.apache.flink.table.planner.calcite.{CalciteParser, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory} @@ -40,7 +40,7 @@ import org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle import org.apache.flink.table.planner.sinks.DataStreamTableSink import org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, validateLogicalPhysicalTypesCompatible, validateSchemaAndApplyImplicitCast, validateTableSink} import org.apache.flink.table.planner.utils.JavaScalaConversionUtil -import org.apache.flink.table.sinks.{OverwritableTableSink, TableSink} +import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter import org.apache.flink.table.utils.TableSchemaUtils @@ -296,19 +296,15 @@ abstract class PlannerBase( case Some(s) if s.isInstanceOf[CatalogTable] => val catalog = catalogManager.getCatalog(objectIdentifier.getCatalogName) val table = s.asInstanceOf[CatalogTable] + val context = new TableSinkFactoryContextImpl( + objectIdentifier, table, getTableConfig.getConfiguration) if (catalog.isPresent && catalog.get().getTableFactory.isPresent) { - val objectPath = objectIdentifier.toObjectPath - val sink = TableFactoryUtil.createTableSinkForCatalogTable( - catalog.get(), - table, - objectPath) + val sink = TableFactoryUtil.createTableSinkForCatalogTable(catalog.get(), context) if (sink.isPresent) { return Option(table, sink.get()) } } - val sinkProperties = table.toProperties - Option(table, TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties) - .createTableSink(sinkProperties)) + Option(table, TableFactoryUtil.findAndCreateTableSink(context)) case _ => None } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala index 96d35a9..9a82a8d 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala @@ -18,12 +18,14 @@ package org.apache.flink.table.planner.plan.schema +import org.apache.flink.configuration.ReadableConfig import org.apache.flink.table.api.TableException import org.apache.flink.table.catalog.CatalogTable -import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory} +import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory, TableSourceFactoryContextImpl} import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder} import org.apache.flink.table.planner.catalog.CatalogSchemaTable import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceValidation} +import org.apache.flink.table.utils.TableConnectorUtils.generateRuntimeName import org.apache.calcite.plan.{RelOptSchema, RelOptTable} import org.apache.calcite.rel.RelNode @@ -62,12 +64,23 @@ class CatalogSourceTable[T]( .toMap } - lazy val tableSource: TableSource[T] = findAndCreateTableSource().asInstanceOf[TableSource[T]] - - override def getQualifiedName: JList[String] = explainSourceAsString(tableSource) + override def getQualifiedName: JList[String] = { + // Do not explain source, we already have full names, table source should be created in toRel. + val ret = new util.ArrayList[String](names) + // Add class name to distinguish TableSourceTable. + val name = generateRuntimeName(getClass, catalogTable.getSchema.getFieldNames) + ret.add(s"catalog_source: [$name]") + ret + } override def toRel(context: RelOptTable.ToRelContext): RelNode = { val cluster = context.getCluster + val flinkContext = cluster + .getPlanner + .getContext + .unwrap(classOf[FlinkContext]) + + val tableSource = findAndCreateTableSource(flinkContext.getTableConfig.getConfiguration) val tableSourceTable = new TableSourceTable[T]( relOptSchema, schemaTable.getTableIdentifier, @@ -91,11 +104,7 @@ class CatalogSourceTable[T]( val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema) relBuilder.push(scan) - val toRexFactory = cluster - .getPlanner - .getContext - .unwrap(classOf[FlinkContext]) - .getSqlExprToRexConverterFactory + val toRexFactory = flinkContext.getSqlExprToRexConverterFactory // 2. push computed column project val fieldNames = rowType.getFieldNames.asScala @@ -140,32 +149,25 @@ class CatalogSourceTable[T]( relBuilder.build() } - /** Create the table source lazily. */ - private def findAndCreateTableSource(): TableSource[_] = { + /** Create the table source. */ + private def findAndCreateTableSource(conf: ReadableConfig): TableSource[T] = { val tableFactoryOpt = schemaTable.getTableFactory + val context = new TableSourceFactoryContextImpl( + schemaTable.getTableIdentifier, catalogTable, conf) val tableSource = if (tableFactoryOpt.isPresent) { tableFactoryOpt.get() match { case tableSourceFactory: TableSourceFactory[_] => - tableSourceFactory.createTableSource( - schemaTable.getTableIdentifier.toObjectPath, - catalogTable) + tableSourceFactory.createTableSource(context) case _ => throw new TableException("Cannot query a sink-only table. " + "TableFactory provided by catalog must implement TableSourceFactory") } } else { - TableFactoryUtil.findAndCreateTableSource(catalogTable) + TableFactoryUtil.findAndCreateTableSource(context) } if (!tableSource.isInstanceOf[StreamTableSource[_]]) { throw new TableException("Catalog tables support only " + "StreamTableSource and InputFormatTableSource") } - tableSource - } - - override protected def explainSourceAsString(ts: TableSource[_]): JList[String] = { - val ret = new util.ArrayList[String](super.explainSourceAsString(ts)) - // Add class name to distinguish TableSourceTable. - ret.add("class: " + classOf[CatalogSourceTable[_]].getSimpleName) - ret + tableSource.asInstanceOf[TableSource[T]] } } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml index 39a67ca..63bea00 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml @@ -100,7 +100,7 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e]) <TestCase name="testTableApiScanWithDDL"> <Resource name="planBefore"> <![CDATA[ -LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]]) +LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(a, b)]]]) ]]> </Resource> <Resource name="planAfter"> @@ -113,7 +113,7 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [Collecti <TestCase name="testTableApiScanWithTemporaryTable"> <Resource name="planBefore"> <![CDATA[ -LogicalTableScan(table=[[default_catalog, default_database, t1, source: [CsvTableSource(read fields: word)], class: CatalogSourceTable]]) +LogicalTableScan(table=[[default_catalog, default_database, t1, catalog_source: [CatalogSourceTable(word)]]]) ]]> </Resource> <Resource name="planAfter"> @@ -126,7 +126,7 @@ TableSourceScan(table=[[default_catalog, default_database, t1, source: [CsvTable <TestCase name="testTableApiScanWithWatermark"> <Resource name="planBefore"> <![CDATA[ -LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]]) +LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]]) ]]> </Resource> <Resource name="planAfter"> @@ -140,7 +140,7 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, my_udf(a) AS e]) <TestCase name="testTableApiScanWithComputedColumn"> <Resource name="planBefore"> <![CDATA[ -LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, source: [CollectionTableSource(a, b)], class: CatalogSourceTable]]) +LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, catalog_source: [CatalogSourceTable(a, b, c, d, e)]]]) ]]> </Resource> <Resource name="planAfter"> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala index 01c8fb8..170f118 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala @@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat} -import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, DataStreamSource} @@ -30,12 +29,11 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.table.api.TableSchema import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR -import org.apache.flink.table.descriptors.{DescriptorProperties, Schema} -import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, StreamTableSinkFactory, StreamTableSourceFactory} +import org.apache.flink.table.factories.{TableSinkFactory, TableSourceFactory} import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction} import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory.{getCollectionSink, getCollectionSource} import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, StreamTableSink, TableSink} -import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource, TableSource} +import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource} import org.apache.flink.table.types.DataType import org.apache.flink.types.Row @@ -45,35 +43,16 @@ import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, List => JL import scala.collection.JavaConversions._ -class TestCollectionTableFactory - extends StreamTableSourceFactory[Row] - with StreamTableSinkFactory[Row] - with BatchTableSourceFactory[Row] - with BatchTableSinkFactory[Row] -{ +class TestCollectionTableFactory extends TableSourceFactory[Row] with TableSinkFactory[Row] { - override def createTableSource(properties: JMap[String, String]): TableSource[Row] = { - getCollectionSource(properties) + override def createTableSource( + context: TableSourceFactory.Context): StreamTableSource[Row] = { + getCollectionSource(context) } - override def createTableSink(properties: JMap[String, String]): TableSink[Row] = { - getCollectionSink(properties) - } - - override def createStreamTableSource(properties: JMap[String, String]): StreamTableSource[Row] = { - getCollectionSource(properties) - } - - override def createStreamTableSink(properties: JMap[String, String]): StreamTableSink[Row] = { - getCollectionSink(properties) - } - - override def createBatchTableSource(properties: JMap[String, String]): BatchTableSource[Row] = { - getCollectionSource(properties) - } - - override def createBatchTableSink(properties: JMap[String, String]): BatchTableSink[Row] = { - getCollectionSink(properties) + override def createTableSink( + context: TableSinkFactory.Context): StreamTableSink[Row] = { + getCollectionSink(context) } override def requiredContext(): JMap[String, String] = { @@ -118,18 +97,14 @@ object TestCollectionTableFactory { def getResult: util.List[Row] = RESULT - def getCollectionSource(props: JMap[String, String]): CollectionTableSource = { - val properties = new DescriptorProperties() - properties.putProperties(props) - val schema = properties.getTableSchema(Schema.SCHEMA) - val isBounded = properties.getOptionalBoolean(IS_BOUNDED).orElse(true) + def getCollectionSource(context: TableSourceFactory.Context): CollectionTableSource = { + val schema = context.getTable.getSchema + val isBounded = context.getTable.getProperties.getOrDefault(IS_BOUNDED, "true").toBoolean new CollectionTableSource(emitIntervalMS, physicalSchema(schema), isBounded) } - def getCollectionSink(props: JMap[String, String]): CollectionTableSink = { - val properties = new DescriptorProperties() - properties.putProperties(props) - val schema = properties.getTableSchema(Schema.SCHEMA) + def getCollectionSink(context: TableSinkFactory.Context): CollectionTableSink = { + val schema = context.getTable.getSchema new CollectionTableSink(physicalSchema(schema)) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala index e04f9eb..4a91621 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala @@ -20,20 +20,11 @@ package org.apache.flink.table.planner.plan.batch.sql import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{DataTypes, TableSchema} -import org.apache.flink.table.catalog.{CatalogTableImpl, GenericInMemoryCatalog, ObjectPath} -import org.apache.flink.table.factories.TableSinkFactory import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder import org.apache.flink.table.planner.utils.TableTestBase -import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.types.logical.{BigIntType, IntType} import org.junit.Test -import org.mockito.{ArgumentMatchers, Mockito} - -import java.util.Optional - -import scala.collection.JavaConverters._ class SinkTest extends TableTestBase { @@ -68,26 +59,4 @@ class SinkTest extends TableTestBase { util.verifyPlan() } - - @Test - def testCatalogTableSink(): Unit = { - val schemaBuilder = new TableSchema.Builder() - schemaBuilder.fields(Array("i"), Array(DataTypes.INT())) - val schema = schemaBuilder.build() - val sink = util.createCollectTableSink(schema.getFieldNames, Array(INT)) - val catalog = Mockito.spy(new GenericInMemoryCatalog("dummy")) - val factory = Mockito.mock(classOf[TableSinkFactory[_]]) - Mockito.when[Optional[_]](catalog.getTableFactory).thenReturn(Optional.of(factory)) - Mockito.when[TableSink[_]](factory.createTableSink( - ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(sink) - util.tableEnv.registerCatalog(catalog.getName, catalog) - util.tableEnv.useCatalog(catalog.getName) - val catalogTable = new CatalogTableImpl(schema, Map[String, String]().asJava, "") - catalog.createTable(new ObjectPath("default", "tbl"), catalogTable, false) - util.tableEnv.sqlQuery("select 1").insertInto("tbl") - util.tableEnv.explain(false) - // verify we tried to get table factory from catalog - Mockito.verify(catalog, Mockito.atLeast(1)).getTableFactory - } - } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala new file mode 100644 index 0000000..9661bfc --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.common + +import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectIdentifier} +import org.apache.flink.table.factories.TableFactory +import org.apache.flink.table.planner.plan.utils.TestContextTableFactory +import org.apache.flink.table.planner.utils.TableTestBase + +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{Assert, Test} + +import java.util.Optional + +@RunWith(classOf[Parameterized]) +class TableFactoryTest(isBatch: Boolean) extends TableTestBase { + + private val util = if (isBatch) batchTestUtil() else streamTestUtil() + + @Test + def testTableSourceSinkFactory(): Unit = { + val factory = new TestContextTableFactory( + ObjectIdentifier.of("cat", "default", "t1"), + ObjectIdentifier.of("cat", "default", "t2")) + util.tableEnv.getConfig.getConfiguration.setBoolean(TestContextTableFactory.REQUIRED_KEY, true) + util.tableEnv.registerCatalog("cat", new GenericInMemoryCatalog("default") { + override def getTableFactory: Optional[TableFactory] = Optional.of(factory) + }) + util.tableEnv.useCatalog("cat") + + val sourceDDL = + """ + |create table t1( + | a int, + | b varchar, + | c as a + 1 + |) with ( + | 'connector' = 'COLLECTION' + |) + """.stripMargin + val sinkDDL = + """ + |create table t2( + | a int, + | b as c - 1, + | c int + |) with ( + | 'connector' = 'COLLECTION' + |) + """.stripMargin + val query = + """ + |insert into t2 + |select t1.a, t1.c from t1 + """.stripMargin + util.tableEnv.sqlUpdate(sourceDDL) + util.tableEnv.sqlUpdate(sinkDDL) + util.tableEnv.sqlUpdate(query) + + util.tableEnv.explain(false) + Assert.assertTrue(factory.hasInvokedSource) + Assert.assertTrue(factory.hasInvokedSink) + } +} + +object TableFactoryTest { + @Parameterized.Parameters(name = "isBatch: {0}") + def parameters(): java.util.Collection[Boolean] = { + java.util.Arrays.asList(true, false) + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala new file mode 100644 index 0000000..9e60cd7 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.utils + +import org.apache.flink.configuration.{ConfigOption, ConfigOptions} +import org.apache.flink.table.catalog.ObjectIdentifier +import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactory, TableSourceFactory} +import org.apache.flink.table.planner.plan.utils.TestContextTableFactory.REQUIRED_KEY +import org.apache.flink.table.sinks.TableSink +import org.apache.flink.table.sources.TableSource + +import org.junit.Assert + +import java.{lang, util} + +/** + * Test [[TableSourceFactory]] and [[TableSinkFactory]] for context. + */ +class TestContextTableFactory[T]( + sourceIdentifier: ObjectIdentifier, + sinkIdentifier: ObjectIdentifier) + extends TableSourceFactory[T] with TableSinkFactory[T] { + + var hasInvokedSource = false + var hasInvokedSink = false + + override def requiredContext(): util.Map[String, String] = { + throw new UnsupportedOperationException + } + + override def supportedProperties(): util.List[String] = { + throw new UnsupportedOperationException + } + + override def createTableSource(context: TableSourceFactory.Context): TableSource[T] = { + Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY)) + Assert.assertEquals(sourceIdentifier, context.getObjectIdentifier) + hasInvokedSource = true + TableFactoryUtil.findAndCreateTableSource(context) + } + + override def createTableSink(context: TableSinkFactory.Context): TableSink[T] = { + Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY)) + Assert.assertEquals(sinkIdentifier, context.getObjectIdentifier) + hasInvokedSink = true + TableFactoryUtil.findAndCreateTableSink(context) + } +} + +object TestContextTableFactory{ + val REQUIRED_KEY: ConfigOption[lang.Boolean] = ConfigOptions + .key("testing.required.key").booleanType().defaultValue(false) +}
