This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e280ffc8db37697104809c9fed8b0ed2a850372c
Author: JingsongLi <[email protected]>
AuthorDate: Tue Feb 11 14:28:02 2020 +0800

    [FLINK-15912][table-planner] Support create table source/sink by context in 
legacy planner
---
 .../flink/table/factories/TableFactoryUtil.java    | 12 ----
 .../flink/table/catalog/CatalogCalciteSchema.java  | 11 +++-
 .../table/catalog/CatalogManagerCalciteSchema.java |  8 ++-
 .../flink/table/catalog/DatabaseCalciteSchema.java | 27 +++++---
 .../flink/table/api/internal/TableEnvImpl.scala    | 15 ++---
 .../apache/flink/table/planner/StreamPlanner.scala | 16 ++---
 .../table/catalog/DatabaseCalciteSchemaTest.java   |  4 +-
 .../table/sqlexec/SqlToOperationConverterTest.java |  2 +-
 .../table/api/stream/sql/TableFactoryTest.scala    | 74 ++++++++++++++++++++++
 .../table/utils/TestContextTableFactory.scala      | 69 ++++++++++++++++++++
 10 files changed, 191 insertions(+), 47 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index e33d55c..bea892e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.factories;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 
@@ -106,17 +105,6 @@ public class TableFactoryUtil {
        /**
         * Creates a table sink for a {@link CatalogTable} using table factory 
associated with the catalog.
         */
-       public static Optional<TableSink> 
createTableSinkForCatalogTable(Catalog catalog, CatalogTable catalogTable, 
ObjectPath tablePath) {
-               TableFactory tableFactory = 
catalog.getTableFactory().orElse(null);
-               if (tableFactory instanceof TableSinkFactory) {
-                       return Optional.ofNullable(((TableSinkFactory) 
tableFactory).createTableSink(tablePath, catalogTable));
-               }
-               return Optional.empty();
-       }
-
-       /**
-        * Creates a table sink for a {@link CatalogTable} using table factory 
associated with the catalog.
-        */
        public static Optional<TableSink> 
createTableSinkForCatalogTable(Catalog catalog, TableSinkFactory.Context 
context) {
                TableFactory tableFactory = 
catalog.getTableFactory().orElse(null);
                if (tableFactory instanceof TableSinkFactory) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
index 5be5a79..f5dc057 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableConfig;
 
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.rel.type.RelProtoDataType;
@@ -44,11 +45,17 @@ public class CatalogCalciteSchema implements Schema {
        private final boolean isStreamingMode;
        private final String catalogName;
        private final CatalogManager catalogManager;
+       private final TableConfig tableConfig;
 
-       public CatalogCalciteSchema(boolean isStreamingMode, String 
catalogName, CatalogManager catalogManager) {
+       public CatalogCalciteSchema(
+                       boolean isStreamingMode,
+                       String catalogName,
+                       CatalogManager catalogManager,
+                       TableConfig tableConfig) {
                this.isStreamingMode = isStreamingMode;
                this.catalogName = catalogName;
                this.catalogManager = catalogManager;
+               this.tableConfig = tableConfig;
        }
 
        /**
@@ -60,7 +67,7 @@ public class CatalogCalciteSchema implements Schema {
        @Override
        public Schema getSubSchema(String schemaName) {
                if (catalogManager.schemaExists(catalogName, schemaName)) {
-                       return new DatabaseCalciteSchema(isStreamingMode, 
schemaName, catalogName, catalogManager);
+                       return new DatabaseCalciteSchema(isStreamingMode, 
schemaName, catalogName, catalogManager, tableConfig);
                } else {
                        return null;
                }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
index e7376fc..1ec9783 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableConfig;
 
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.rel.type.RelProtoDataType;
@@ -43,10 +44,13 @@ import java.util.Set;
 public class CatalogManagerCalciteSchema implements Schema {
 
        private final CatalogManager catalogManager;
+       private final TableConfig tableConfig;
        private boolean isStreamingMode;
 
-       public CatalogManagerCalciteSchema(CatalogManager catalogManager, 
boolean isStreamingMode) {
+       public CatalogManagerCalciteSchema(
+                       CatalogManager catalogManager, TableConfig tableConfig, 
boolean isStreamingMode) {
                this.catalogManager = catalogManager;
+               this.tableConfig = tableConfig;
                this.isStreamingMode = isStreamingMode;
        }
 
@@ -83,7 +87,7 @@ public class CatalogManagerCalciteSchema implements Schema {
        @Override
        public Schema getSubSchema(String name) {
                if (catalogManager.schemaExists(name)) {
-                       return new CatalogCalciteSchema(isStreamingMode, name, 
catalogManager);
+                       return new CatalogCalciteSchema(isStreamingMode, name, 
catalogManager, tableConfig);
                } else {
                        return null;
                }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 1dd5b23..6226dff 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.table.catalog;
 
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.table.factories.TableFactoryUtil;
 import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
 import org.apache.flink.table.plan.schema.TableSinkTable;
 import org.apache.flink.table.plan.schema.TableSourceTable;
 import org.apache.flink.table.plan.stats.FlinkStatistic;
@@ -57,16 +59,19 @@ class DatabaseCalciteSchema implements Schema {
        private final String catalogName;
        private final String databaseName;
        private final CatalogManager catalogManager;
+       private final TableConfig tableConfig;
 
        public DatabaseCalciteSchema(
                        boolean isStreamingMode,
                        String databaseName,
                        String catalogName,
-                       CatalogManager catalogManager) {
+                       CatalogManager catalogManager,
+                       TableConfig tableConfig) {
                this.isStreamingMode = isStreamingMode;
                this.databaseName = databaseName;
                this.catalogName = catalogName;
                this.catalogManager = catalogManager;
+               this.tableConfig = tableConfig;
        }
 
        @Override
@@ -83,20 +88,20 @@ class DatabaseCalciteSchema implements Schema {
                                                
.flatMap(Catalog::getTableFactory)
                                                .orElse(null);
                                }
-                               return convertTable(identifier.toObjectPath(), 
table, tableFactory);
+                               return convertTable(identifier, table, 
tableFactory);
                        })
                        .orElse(null);
        }
 
-       private Table convertTable(ObjectPath tablePath, CatalogBaseTable 
table, @Nullable TableFactory tableFactory) {
+       private Table convertTable(ObjectIdentifier identifier, 
CatalogBaseTable table, @Nullable TableFactory tableFactory) {
                if (table instanceof QueryOperationCatalogView) {
                        return 
QueryOperationCatalogViewTable.createCalciteTable(((QueryOperationCatalogView) 
table));
                } else if (table instanceof ConnectorCatalogTable) {
                        return convertConnectorTable((ConnectorCatalogTable<?, 
?>) table);
                } else if (table instanceof CatalogTable) {
-                       return convertCatalogTable(tablePath, (CatalogTable) 
table, tableFactory);
+                       return convertCatalogTable(identifier, (CatalogTable) 
table, tableFactory);
                } else if (table instanceof CatalogView) {
-                       return convertCatalogView(tablePath, (CatalogView) 
table);
+                       return convertCatalogView(identifier.getObjectName(), 
(CatalogView) table);
                } else {
                        throw new TableException("Unsupported table type: " + 
table);
                }
@@ -125,17 +130,19 @@ class DatabaseCalciteSchema implements Schema {
                }
        }
 
-       private Table convertCatalogTable(ObjectPath tablePath, CatalogTable 
table, @Nullable TableFactory tableFactory) {
+       private Table convertCatalogTable(ObjectIdentifier identifier, 
CatalogTable table, @Nullable TableFactory tableFactory) {
                final TableSource<?> tableSource;
+               final TableSourceFactory.Context context = new 
TableSourceFactoryContextImpl(
+                               identifier, table, 
tableConfig.getConfiguration());
                if (tableFactory != null) {
                        if (tableFactory instanceof TableSourceFactory) {
-                               tableSource = ((TableSourceFactory) 
tableFactory).createTableSource(tablePath, table);
+                               tableSource = ((TableSourceFactory) 
tableFactory).createTableSource(context);
                        } else {
                                throw new TableException(
                                        "Cannot query a sink-only table. 
TableFactory provided by catalog must implement TableSourceFactory");
                        }
                } else {
-                       tableSource = 
TableFactoryUtil.findAndCreateTableSource(table);
+                       tableSource = 
TableFactoryUtil.findAndCreateTableSource(context);
                }
 
                if (!(tableSource instanceof StreamTableSource)) {
@@ -153,14 +160,14 @@ class DatabaseCalciteSchema implements Schema {
                );
        }
 
-       private Table convertCatalogView(ObjectPath tableName, CatalogView 
table) {
+       private Table convertCatalogView(String tableName, CatalogView table) {
                TableSchema schema = table.getSchema();
                return new ViewTable(
                        null,
                        typeFactory -> ((FlinkTypeFactory) 
typeFactory).buildLogicalRowType(schema),
                        table.getExpandedQuery(),
                        Arrays.asList(catalogName, databaseName),
-                       Arrays.asList(catalogName, databaseName, 
tableName.getObjectName())
+                       Arrays.asList(catalogName, databaseName, tableName)
                );
        }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 3faee81..9fd0278 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.catalog.exceptions.{TableNotExistException => _, _
 import org.apache.flink.table.delegation.Parser
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
-import org.apache.flink.table.factories.{TableFactoryService, 
TableFactoryUtil, TableSinkFactory}
+import org.apache.flink.table.factories.{TableFactoryUtil, 
TableSinkFactoryContextImpl}
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, 
TableFunction, UserDefinedAggregateFunction, _}
 import org.apache.flink.table.module.{Module, ModuleManager}
 import org.apache.flink.table.operations.ddl._
@@ -102,7 +102,7 @@ abstract class TableEnvImpl(
     new PlanningConfigurationBuilder(
       config,
       functionCatalog,
-      asRootSchema(new CatalogManagerCalciteSchema(catalogManager, 
isStreamingMode)),
+      asRootSchema(new CatalogManagerCalciteSchema(catalogManager, config, 
isStreamingMode)),
       expressionBridge)
 
   private val parser: Parser = new ParserImpl(
@@ -741,18 +741,15 @@ abstract class TableEnvImpl(
 
         val catalog = 
catalogManager.getCatalog(objectIdentifier.getCatalogName)
         val catalogTable = s.asInstanceOf[CatalogTable]
+        val context = new TableSinkFactoryContextImpl(
+          objectIdentifier, catalogTable, config.getConfiguration)
         if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
-          val sink = TableFactoryUtil.createTableSinkForCatalogTable(
-            catalog.get(),
-            catalogTable,
-            objectIdentifier.toObjectPath)
+          val sink = 
TableFactoryUtil.createTableSinkForCatalogTable(catalog.get(), context)
           if (sink.isPresent) {
             return Option(sink.get())
           }
         }
-        val sinkProperties = catalogTable.toProperties
-        Option(TableFactoryService.find(classOf[TableSinkFactory[_]], 
sinkProperties)
-          .createTableSink(sinkProperties))
+        Option(TableFactoryUtil.findAndCreateTableSink(context))
 
       case _ => None
     }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 6592f1b..46823f2 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -30,7 +30,7 @@ import org.apache.flink.table.delegation.{Executor, Parser, 
Planner}
 import org.apache.flink.table.executor.StreamExecutor
 import org.apache.flink.table.explain.PlanJsonParser
 import org.apache.flink.table.expressions.{ExpressionBridge, 
PlannerExpression, PlannerExpressionConverter, PlannerTypeInferenceUtilImpl}
-import org.apache.flink.table.factories.{TableFactoryService, 
TableFactoryUtil, TableSinkFactory}
+import org.apache.flink.table.factories.{TableFactoryUtil, 
TableSinkFactoryContextImpl}
 import 
org.apache.flink.table.operations.OutputConversionModifyOperation.UpdateMode
 import org.apache.flink.table.operations._
 import org.apache.flink.table.plan.StreamOptimizer
@@ -51,7 +51,6 @@ import _root_.java.util
 import _root_.java.util.Objects
 import _root_.java.util.function.{Supplier => JSupplier}
 
-import _root_.scala.collection.JavaConversions._
 import _root_.scala.collection.JavaConverters._
 
 /**
@@ -77,7 +76,7 @@ class StreamPlanner(
   
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
 
   private val internalSchema: CalciteSchema =
-    asRootSchema(new CatalogManagerCalciteSchema(catalogManager, true))
+    asRootSchema(new CatalogManagerCalciteSchema(catalogManager, config, true))
 
   // temporary bridge between API and planner
   private val expressionBridge: ExpressionBridge[PlannerExpression] =
@@ -434,18 +433,15 @@ class StreamPlanner(
       case Some(s) if s.isInstanceOf[CatalogTable] =>
         val catalog = 
catalogManager.getCatalog(objectIdentifier.getCatalogName)
         val catalogTable = s.asInstanceOf[CatalogTable]
+        val context = new TableSinkFactoryContextImpl(
+          objectIdentifier, catalogTable, config.getConfiguration)
         if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
-          val sink = TableFactoryUtil.createTableSinkForCatalogTable(
-            catalog.get(),
-            catalogTable,
-            objectIdentifier.toObjectPath)
+          val sink = 
TableFactoryUtil.createTableSinkForCatalogTable(catalog.get(), context)
           if (sink.isPresent) {
             return Option(sink.get())
           }
         }
-        val sinkProperties = catalogTable.toProperties
-        Option(TableFactoryService.find(classOf[TableSinkFactory[_]], 
sinkProperties)
-          .createTableSink(sinkProperties))
+        Option(TableFactoryUtil.findAndCreateTableSink(context))
 
       case _ => None
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
index 69e1438..a850f48 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.catalog;
 
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableSchema;
 import 
org.apache.flink.table.catalog.TestExternalTableSourceFactory.TestExternalTableSource;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -55,7 +56,8 @@ public class DatabaseCalciteSchemaTest {
                DatabaseCalciteSchema calciteSchema = new 
DatabaseCalciteSchema(true,
                        databaseName,
                        catalogName,
-                       catalogManager);
+                       catalogManager,
+                       new TableConfig());
 
                catalog.createTable(new ObjectPath(databaseName, tableName), 
new TestCatalogBaseTable(), false);
                Table table = calciteSchema.getTable(tableName);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
index 8bd077b..036cc6a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -95,7 +95,7 @@ public class SqlToOperationConverterTest {
        private final PlanningConfigurationBuilder planningConfigurationBuilder 
=
                new PlanningConfigurationBuilder(tableConfig,
                        functionCatalog,
-                       asRootSchema(new 
CatalogManagerCalciteSchema(catalogManager, false)),
+                       asRootSchema(new 
CatalogManagerCalciteSchema(catalogManager, tableConfig, false)),
                        new ExpressionBridge<>(functionCatalog,
                                PlannerExpressionConverter.INSTANCE()));
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TableFactoryTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TableFactoryTest.scala
new file mode 100644
index 0000000..9bdbfd1
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/TableFactoryTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.table.catalog.{GenericInMemoryCatalog, 
ObjectIdentifier}
+import org.apache.flink.table.factories.TableFactory
+import org.apache.flink.table.utils.{TableTestBase, TestContextTableFactory}
+
+import org.junit.{Assert, Test}
+
+import java.util.Optional
+
+class TableFactoryTest extends TableTestBase {
+
+  private val util = streamTestUtil()
+
+  @Test
+  def testTableSourceSinkFactory(): Unit = {
+    val factory = new TestContextTableFactory(
+      ObjectIdentifier.of("cat", "default", "t1"),
+      ObjectIdentifier.of("cat", "default", "t2"))
+    
util.tableEnv.getConfig.getConfiguration.setBoolean(TestContextTableFactory.REQUIRED_KEY,
 true)
+    util.tableEnv.registerCatalog("cat", new GenericInMemoryCatalog("default") 
{
+      override def getTableFactory: Optional[TableFactory] = 
Optional.of(factory)
+    })
+    util.tableEnv.useCatalog("cat")
+
+    val sourceDDL =
+      """
+        |create table t1(
+        |  a int,
+        |  b varchar,
+        |  c int
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val sinkDDL =
+      """
+        |create table t2(
+        |  a int,
+        |  b int
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    val query =
+      """
+        |insert into t2
+        |select t1.a, t1.c from t1
+      """.stripMargin
+    util.tableEnv.sqlUpdate(sourceDDL)
+    util.tableEnv.sqlUpdate(sinkDDL)
+    util.tableEnv.sqlUpdate(query)
+    Assert.assertTrue(factory.hasInvokedSource)
+    Assert.assertTrue(factory.hasInvokedSink)
+  }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestContextTableFactory.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestContextTableFactory.scala
new file mode 100644
index 0000000..611076f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TestContextTableFactory.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.configuration.{ConfigOption, ConfigOptions}
+import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.table.factories.{TableFactoryUtil, TableSinkFactory, 
TableSourceFactory}
+import org.apache.flink.table.utils.TestContextTableFactory.REQUIRED_KEY
+import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sources.TableSource
+
+import org.junit.Assert
+
+import java.{lang, util}
+
+/**
+  * Test [[TableSourceFactory]] and [[TableSinkFactory]] for context.
+  */
+class TestContextTableFactory[T](
+    sourceIdentifier: ObjectIdentifier,
+    sinkIdentifier: ObjectIdentifier)
+    extends TableSourceFactory[T] with TableSinkFactory[T] {
+
+  var hasInvokedSource = false
+  var hasInvokedSink = false
+
+  override def requiredContext(): util.Map[String, String] = {
+    throw new UnsupportedOperationException
+  }
+
+  override def supportedProperties(): util.List[String] = {
+    throw new UnsupportedOperationException
+  }
+
+  override def createTableSource(context: TableSourceFactory.Context): 
TableSource[T] = {
+    Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY))
+    Assert.assertEquals(sourceIdentifier, context.getObjectIdentifier)
+    hasInvokedSource = true
+    TableFactoryUtil.findAndCreateTableSource(context)
+  }
+
+  override def createTableSink(context: TableSinkFactory.Context): 
TableSink[T] = {
+    Assert.assertTrue(context.getConfiguration.get(REQUIRED_KEY))
+    Assert.assertEquals(sinkIdentifier, context.getObjectIdentifier)
+    hasInvokedSink = true
+    TableFactoryUtil.findAndCreateTableSink(context)
+  }
+}
+
+object TestContextTableFactory{
+  val REQUIRED_KEY: ConfigOption[lang.Boolean] = ConfigOptions
+      .key("testing.required.key").booleanType().defaultValue(false)
+}

Reply via email to