This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 69d8816d164a106f8edf61a768569dafa5b0dc8d
Author: JingsongLi <[email protected]>
AuthorDate: Mon Feb 10 15:35:42 2020 +0800

    [FLINK-15912][table-planner-blink] Support create table source/sink by 
context in blink planner
---
 .../flink/table/factories/TableFactoryUtil.java    | 39 ++++++++++
 .../table/planner/delegation/PlannerBase.scala     | 16 ++--
 .../planner/plan/schema/CatalogSourceTable.scala   | 48 ++++++------
 .../table/planner/plan/batch/sql/TableScanTest.xml |  8 +-
 .../utils/TestCollectionTableFactory.scala         | 53 ++++---------
 .../table/planner/plan/batch/sql/SinkTest.scala    | 31 --------
 .../planner/plan/common/TableFactoryTest.scala     | 88 ++++++++++++++++++++++
 .../plan/utils/TestContextTableFactory.scala       | 69 +++++++++++++++++
 8 files changed, 245 insertions(+), 107 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index 57ca4cf..e33d55c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -34,6 +34,34 @@ import java.util.Optional;
 public class TableFactoryUtil {
 
        /**
+        * Returns a table source matching the descriptor.
+        */
+       @SuppressWarnings("unchecked")
+       public static <T> TableSource<T> 
findAndCreateTableSource(TableSourceFactory.Context context) {
+               try {
+                       return TableFactoryService
+                                       .find(TableSourceFactory.class, 
context.getTable().toProperties())
+                                       .createTableSource(context);
+               } catch (Throwable t) {
+                       throw new TableException("findAndCreateTableSource 
failed.", t);
+               }
+       }
+
+       /**
+        * Returns a table sink matching the context.
+        */
+       @SuppressWarnings("unchecked")
+       public static <T> TableSink<T> 
findAndCreateTableSink(TableSinkFactory.Context context) {
+               try {
+                       return TableFactoryService
+                                       .find(TableSinkFactory.class, 
context.getTable().toProperties())
+                                       .createTableSink(context);
+               } catch (Throwable t) {
+                       throw new TableException("findAndCreateTableSink 
failed.", t);
+               }
+       }
+
+       /**
         * Returns a table source matching the properties.
         */
        @SuppressWarnings("unchecked")
@@ -86,4 +114,15 @@ public class TableFactoryUtil {
                return Optional.empty();
        }
 
+       /**
+        * Creates a table sink for a {@link CatalogTable} using table factory 
associated with the catalog.
+        */
+       public static Optional<TableSink> 
createTableSinkForCatalogTable(Catalog catalog, TableSinkFactory.Context 
context) {
+               TableFactory tableFactory = 
catalog.getTableFactory().orElse(null);
+               if (tableFactory instanceof TableSinkFactory) {
+                       return Optional.ofNullable(((TableSinkFactory) 
tableFactory).createTableSink(context));
+               }
+               return Optional.empty();
+       }
+
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index ab084e5..47892a1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -25,7 +25,7 @@ import 
org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.api.{TableConfig, TableEnvironment, 
TableException}
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.delegation.{Executor, Parser, Planner}
-import org.apache.flink.table.factories.{TableFactoryService, 
TableFactoryUtil, TableSinkFactory}
+import org.apache.flink.table.factories.{TableFactoryUtil, 
TableSinkFactoryContextImpl}
 import 
org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
 import org.apache.flink.table.operations._
 import org.apache.flink.table.planner.calcite.{CalciteParser, 
FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
@@ -40,7 +40,7 @@ import 
org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle
 import org.apache.flink.table.planner.sinks.DataStreamTableSink
 import 
org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, 
validateLogicalPhysicalTypesCompatible, validateSchemaAndApplyImplicitCast, 
validateTableSink}
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil
-import org.apache.flink.table.sinks.{OverwritableTableSink, TableSink}
+import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
 import org.apache.flink.table.utils.TableSchemaUtils
 
@@ -296,19 +296,15 @@ abstract class PlannerBase(
       case Some(s) if s.isInstanceOf[CatalogTable] =>
         val catalog = 
catalogManager.getCatalog(objectIdentifier.getCatalogName)
         val table = s.asInstanceOf[CatalogTable]
+        val context = new TableSinkFactoryContextImpl(
+          objectIdentifier, table, getTableConfig.getConfiguration)
         if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
-          val objectPath = objectIdentifier.toObjectPath
-          val sink = TableFactoryUtil.createTableSinkForCatalogTable(
-            catalog.get(),
-            table,
-            objectPath)
+          val sink = 
TableFactoryUtil.createTableSinkForCatalogTable(catalog.get(), context)
           if (sink.isPresent) {
             return Option(table, sink.get())
           }
         }
-        val sinkProperties = table.toProperties
-        Option(table, TableFactoryService.find(classOf[TableSinkFactory[_]], 
sinkProperties)
-          .createTableSink(sinkProperties))
+        Option(table, TableFactoryUtil.findAndCreateTableSink(context))
 
       case _ => None
     }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
index 96d35a9..9a82a8d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/CatalogSourceTable.scala
@@ -18,12 +18,14 @@
 
 package org.apache.flink.table.planner.plan.schema
 
+import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.catalog.CatalogTable
-import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory}
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSourceFactory, 
TableSourceFactoryContextImpl}
 import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkRelBuilder}
 import org.apache.flink.table.planner.catalog.CatalogSchemaTable
 import org.apache.flink.table.sources.{StreamTableSource, TableSource, 
TableSourceValidation}
+import org.apache.flink.table.utils.TableConnectorUtils.generateRuntimeName
 
 import org.apache.calcite.plan.{RelOptSchema, RelOptTable}
 import org.apache.calcite.rel.RelNode
@@ -62,12 +64,23 @@ class CatalogSourceTable[T](
       .toMap
   }
 
-  lazy val tableSource: TableSource[T] = 
findAndCreateTableSource().asInstanceOf[TableSource[T]]
-
-  override def getQualifiedName: JList[String] = 
explainSourceAsString(tableSource)
+  override def getQualifiedName: JList[String] = {
+    // Do not explain source, we already have full names, table source should 
be created in toRel.
+    val ret = new util.ArrayList[String](names)
+    // Add class name to distinguish TableSourceTable.
+    val name = generateRuntimeName(getClass, 
catalogTable.getSchema.getFieldNames)
+    ret.add(s"catalog_source: [$name]")
+    ret
+  }
 
   override def toRel(context: RelOptTable.ToRelContext): RelNode = {
     val cluster = context.getCluster
+    val flinkContext = cluster
+        .getPlanner
+        .getContext
+        .unwrap(classOf[FlinkContext])
+
+    val tableSource = 
findAndCreateTableSource(flinkContext.getTableConfig.getConfiguration)
     val tableSourceTable = new TableSourceTable[T](
       relOptSchema,
       schemaTable.getTableIdentifier,
@@ -91,11 +104,7 @@ class CatalogSourceTable[T](
     val relBuilder = FlinkRelBuilder.of(cluster, getRelOptSchema)
     relBuilder.push(scan)
 
-    val toRexFactory = cluster
-        .getPlanner
-        .getContext
-        .unwrap(classOf[FlinkContext])
-        .getSqlExprToRexConverterFactory
+    val toRexFactory = flinkContext.getSqlExprToRexConverterFactory
 
     // 2. push computed column project
     val fieldNames = rowType.getFieldNames.asScala
@@ -140,32 +149,25 @@ class CatalogSourceTable[T](
     relBuilder.build()
   }
 
-  /** Create the table source lazily. */
-  private def findAndCreateTableSource(): TableSource[_] = {
+  /** Create the table source. */
+  private def findAndCreateTableSource(conf: ReadableConfig): TableSource[T] = 
{
     val tableFactoryOpt = schemaTable.getTableFactory
+    val context = new TableSourceFactoryContextImpl(
+      schemaTable.getTableIdentifier, catalogTable, conf)
     val tableSource = if (tableFactoryOpt.isPresent) {
       tableFactoryOpt.get() match {
         case tableSourceFactory: TableSourceFactory[_] =>
-          tableSourceFactory.createTableSource(
-            schemaTable.getTableIdentifier.toObjectPath,
-            catalogTable)
+          tableSourceFactory.createTableSource(context)
         case _ => throw new TableException("Cannot query a sink-only table. "
           + "TableFactory provided by catalog must implement 
TableSourceFactory")
       }
     } else {
-      TableFactoryUtil.findAndCreateTableSource(catalogTable)
+      TableFactoryUtil.findAndCreateTableSource(context)
     }
     if (!tableSource.isInstanceOf[StreamTableSource[_]]) {
       throw new TableException("Catalog tables support only "
         + "StreamTableSource and InputFormatTableSource")
     }
-    tableSource
-  }
-
-  override protected def explainSourceAsString(ts: TableSource[_]): 
JList[String] = {
-    val ret = new util.ArrayList[String](super.explainSourceAsString(ts))
-    // Add class name to distinguish TableSourceTable.
-    ret.add("class: " + classOf[CatalogSourceTable[_]].getSimpleName)
-    ret
+    tableSource.asInstanceOf[TableSource[T]]
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
index 39a67ca..63bea00 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.xml
@@ -100,7 +100,7 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, 
my_udf(a) AS e])
   <TestCase name="testTableApiScanWithDDL">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, source: 
[CollectionTableSource(a, b)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, 
catalog_source: [CatalogSourceTable(a, b)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -113,7 +113,7 @@ TableSourceScan(table=[[default_catalog, default_database, 
t1, source: [Collecti
   <TestCase name="testTableApiScanWithTemporaryTable">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, t1, source: 
[CsvTableSource(read fields: word)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, t1, 
catalog_source: [CatalogSourceTable(word)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -126,7 +126,7 @@ TableSourceScan(table=[[default_catalog, default_database, 
t1, source: [CsvTable
   <TestCase name="testTableApiScanWithWatermark">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, 
source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, c_watermark_t, 
catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -140,7 +140,7 @@ Calc(select=[a, b, +(a, 1) AS c, TO_TIMESTAMP(b) AS d, 
my_udf(a) AS e])
   <TestCase name="testTableApiScanWithComputedColumn">
     <Resource name="planBefore">
       <![CDATA[
-LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, 
source: [CollectionTableSource(a, b)], class: CatalogSourceTable]])
+LogicalTableScan(table=[[default_catalog, default_database, computed_column_t, 
catalog_source: [CatalogSourceTable(a, b, c, d, e)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
index 01c8fb8..170f118 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/factories/utils/TestCollectionTableFactory.scala
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.io.{CollectionInputFormat, 
LocalCollectionOutputFormat}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink, 
DataStreamSource}
@@ -30,12 +29,11 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.table.api.TableSchema
 import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
-import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
-import org.apache.flink.table.factories.{BatchTableSinkFactory, 
BatchTableSourceFactory, StreamTableSinkFactory, StreamTableSourceFactory}
+import org.apache.flink.table.factories.{TableSinkFactory, TableSourceFactory}
 import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction}
 import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory.{getCollectionSink,
 getCollectionSource}
 import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, 
StreamTableSink, TableSink}
-import org.apache.flink.table.sources.{BatchTableSource, 
LookupableTableSource, StreamTableSource, TableSource}
+import org.apache.flink.table.sources.{BatchTableSource, 
LookupableTableSource, StreamTableSource}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
 
@@ -45,35 +43,16 @@ import java.util.{ArrayList => JArrayList, LinkedList => 
JLinkedList, List => JL
 
 import scala.collection.JavaConversions._
 
-class TestCollectionTableFactory
-  extends StreamTableSourceFactory[Row]
-    with StreamTableSinkFactory[Row]
-    with BatchTableSourceFactory[Row]
-    with BatchTableSinkFactory[Row]
-{
+class TestCollectionTableFactory extends TableSourceFactory[Row] with 
TableSinkFactory[Row] {
 
-  override def createTableSource(properties: JMap[String, String]): 
TableSource[Row] = {
-    getCollectionSource(properties)
+  override def createTableSource(
+      context: TableSourceFactory.Context): StreamTableSource[Row] = {
+    getCollectionSource(context)
   }
 
-  override def createTableSink(properties: JMap[String, String]): 
TableSink[Row] = {
-    getCollectionSink(properties)
-  }
-
-  override def createStreamTableSource(properties: JMap[String, String]): 
StreamTableSource[Row] = {
-    getCollectionSource(properties)
-  }
-
-  override def createStreamTableSink(properties: JMap[String, String]): 
StreamTableSink[Row] = {
-    getCollectionSink(properties)
-  }
-
-  override def createBatchTableSource(properties: JMap[String, String]): 
BatchTableSource[Row] = {
-    getCollectionSource(properties)
-  }
-
-  override def createBatchTableSink(properties: JMap[String, String]): 
BatchTableSink[Row] = {
-    getCollectionSink(properties)
+  override def createTableSink(
+      context: TableSinkFactory.Context): StreamTableSink[Row] = {
+    getCollectionSink(context)
   }
 
   override def requiredContext(): JMap[String, String] = {
@@ -118,18 +97,14 @@ object TestCollectionTableFactory {
 
   def getResult: util.List[Row] = RESULT
 
-  def getCollectionSource(props: JMap[String, String]): CollectionTableSource 
= {
-    val properties = new DescriptorProperties()
-    properties.putProperties(props)
-    val schema = properties.getTableSchema(Schema.SCHEMA)
-    val isBounded = properties.getOptionalBoolean(IS_BOUNDED).orElse(true)
+  def getCollectionSource(context: TableSourceFactory.Context): 
CollectionTableSource = {
+    val schema = context.getTable.getSchema
+    val isBounded = context.getTable.getProperties.getOrDefault(IS_BOUNDED, 
"true").toBoolean
     new CollectionTableSource(emitIntervalMS, physicalSchema(schema), 
isBounded)
   }
 
-  def getCollectionSink(props: JMap[String, String]): CollectionTableSink = {
-    val properties = new DescriptorProperties()
-    properties.putProperties(props)
-    val schema = properties.getTableSchema(Schema.SCHEMA)
+  def getCollectionSink(context: TableSinkFactory.Context): 
CollectionTableSink = {
+    val schema = context.getTable.getSchema
     new CollectionTableSink(physicalSchema(schema))
   }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala
index e04f9eb..4a91621 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/SinkTest.scala
@@ -20,20 +20,11 @@ package org.apache.flink.table.planner.plan.batch.sql
 
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{DataTypes, TableSchema}
-import org.apache.flink.table.catalog.{CatalogTableImpl, 
GenericInMemoryCatalog, ObjectPath}
-import org.apache.flink.table.factories.TableSinkFactory
 import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import org.apache.flink.table.planner.utils.TableTestBase
-import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.types.logical.{BigIntType, IntType}
 
 import org.junit.Test
-import org.mockito.{ArgumentMatchers, Mockito}
-
-import java.util.Optional
-
-import scala.collection.JavaConverters._
 
 class SinkTest extends TableTestBase {
 
@@ -68,26 +59,4 @@ class SinkTest extends TableTestBase {
 
     util.verifyPlan()
   }
-
-  @Test
-  def testCatalogTableSink(): Unit = {
-    val schemaBuilder = new TableSchema.Builder()
-    schemaBuilder.fields(Array("i"), Array(DataTypes.INT()))
-    val schema = schemaBuilder.build()
-    val sink = util.createCollectTableSink(schema.getFieldNames, Array(INT))
-    val catalog = Mockito.spy(new GenericInMemoryCatalog("dummy"))
-    val factory = Mockito.mock(classOf[TableSinkFactory[_]])
-    
Mockito.when[Optional[_]](catalog.getTableFactory).thenReturn(Optional.of(factory))
-    Mockito.when[TableSink[_]](factory.createTableSink(
-      ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(sink)
-    util.tableEnv.registerCatalog(catalog.getName, catalog)
-    util.tableEnv.useCatalog(catalog.getName)
-    val catalogTable = new CatalogTableImpl(schema, Map[String, 
String]().asJava, "")
-    catalog.createTable(new ObjectPath("default", "tbl"), catalogTable, false)
-    util.tableEnv.sqlQuery("select 1").insertInto("tbl")
-    util.tableEnv.explain(false)
-    // verify we tried to get table factory from catalog
-    Mockito.verify(catalog, Mockito.atLeast(1)).getTableFactory
-  }
-
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
new file mode 100644
index 0000000..9661bfc
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.common
+
+import org.apache.flink.table.catalog.{GenericInMemoryCatalog, 
ObjectIdentifier}
+import org.apache.flink.table.factories.TableFactory
+import org.apache.flink.table.planner.plan.utils.TestContextTableFactory
+import org.apache.flink.table.planner.utils.TableTestBase
+
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Assert, Test}
+
+import java.util.Optional
+
+@RunWith(classOf[Parameterized])
+class TableFactoryTest(isBatch: Boolean) extends TableTestBase {
+
+  private val util = if (isBatch) batchTestUtil() else streamTestUtil()
+
+  @Test
+  def testTableSourceSinkFactory(): Unit = {
+    val factory = new TestContextTableFactory(
+      ObjectIdentifier.of("cat", "default", "t1"),
+      ObjectIdentifier.of("cat", "default", "t2"))
+    
util.tableEnv.getConfig.getConfiguration.setBoolean(TestContextTableFactory.REQUIRED_KEY,
 true)
+    util.tableEnv.registerCatalog("cat", new GenericInMemoryCatalog("default") 
{
+      override def getTableFactory: Optional[TableFactory] = 
Optional.of(factory)
+    })
+    util.tableEnv.useCatalog("cat")
+
+    val sourceDDL =
+      """
+        |create table t1(
+        |  a int,
+        |  b varchar,
+        |  c as a + 1
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val sinkDDL =
+      """
+        |create table t2(
+        |  a int,
+        |  b as c - 1,
+        |  c int
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |insert into t2
+        |select t1.a, t1.c from t1
+      """.stripMargin
+    util.tableEnv.sqlUpdate(sourceDDL)
+    util.tableEnv.sqlUpdate(sinkDDL)
+    util.tableEnv.sqlUpdate(query)
+
+    util.tableEnv.explain(false)
+    Assert.assertTrue(factory.hasInvokedSource)
+    Assert.assertTrue(factory.hasInvokedSink)
+  }
+}
+
+object TableFactoryTest {
+  @Parameterized.Parameters(name = "isBatch: {0}")
+  def parameters(): java.util.Collection[Boolean] = {
+    java.util.Arrays.asList(true, false)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala
new file mode 100644
index 0000000..9e60cd7
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/utils/TestContextTableFactory.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils
+
+import org.apache.flink.configuration.{ConfigOption, ConfigOptions}
+import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactory, 
TableSourceFactory}
+import 
org.apache.flink.table.planner.plan.utils.TestContextTableFactory.REQUIRED_KEY
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
+
+import org.junit.Assert
+
+import java.{lang, util}
+
+/**
+  * Test [[TableSourceFactory]] and [[TableSinkFactory]] for context.
+  */
+class TestContextTableFactory[T](
+    sourceIdentifier: ObjectIdentifier,
+    sinkIdentifier: ObjectIdentifier)
+    extends TableSourceFactory[T] with TableSinkFactory[T] {
+
+  var hasInvokedSource = false
+  var hasInvokedSink = false
+
+  override def requiredContext(): util.Map[String, String] = {
+    throw new UnsupportedOperationException
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    throw new UnsupportedOperationException
+  }
+
+  override def createTableSource(context: TableSourceFactory.Context): 
TableSource[T] = {
+    Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY))
+    Assert.assertEquals(sourceIdentifier, context.getObjectIdentifier)
+    hasInvokedSource = true
+    TableFactoryUtil.findAndCreateTableSource(context)
+  }
+
+  override def createTableSink(context: TableSinkFactory.Context): 
TableSink[T] = {
+    Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY))
+    Assert.assertEquals(sinkIdentifier, context.getObjectIdentifier)
+    hasInvokedSink = true
+    TableFactoryUtil.findAndCreateTableSink(context)
+  }
+}
+
+object TestContextTableFactory{
+  val REQUIRED_KEY: ConfigOption[lang.Boolean] = ConfigOptions
+      .key("testing.required.key").booleanType().defaultValue(false)
+}

Reply via email to