This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 7b4f39d  [FLINK-13170][table-planner] Planner should get table factory 
from catalog when creating sink for CatalogTable
7b4f39d is described below

commit 7b4f39d9482fcba2526d5fe1adef9850de3473fc
Author: Rui Li <li...@apache.org>
AuthorDate: Tue Jul 9 20:09:20 2019 +0800

    [FLINK-13170][table-planner] Planner should get table factory from catalog 
when creating sink for CatalogTable
    
    Planner should first try getting table factory from catalog when creating 
table sinks for CatalogTable.
    
    This closes #9039.
---
 .../batch/connectors/hive/HiveTableSinkTest.java   | 19 ++++++---------
 .../flink/table/factories/TableFactoryUtil.java    | 14 +++++++++++
 .../apache/flink/table/planner/PlannerBase.scala   | 15 ++++++++++--
 .../flink/table/plan/batch/sql/SinkTest.scala      | 28 ++++++++++++++++++++++
 .../flink/table/api/internal/TableEnvImpl.scala    | 13 +++++++++-
 .../apache/flink/table/planner/StreamPlanner.scala | 13 +++++++++-
 6 files changed, 86 insertions(+), 16 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
index fe54eac..d16f2b0 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java
@@ -88,9 +88,8 @@ public class HiveTableSinkTest {
                List<Row> toWrite = generateRecords(5);
                tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, 
rowTypeInfo));
 
-               CatalogTable table = (CatalogTable) 
hiveCatalog.getTable(tablePath);
-               tableEnv.registerTableSink("destSink", new HiveTableSink(new 
JobConf(hiveConf), tablePath, table));
-               tableEnv.sqlQuery("select * from src").insertInto("destSink");
+               tableEnv.registerCatalog("hive", hiveCatalog);
+               tableEnv.sqlQuery("select * from src").insertInto("hive", 
"default", "dest");
                execEnv.execute();
 
                verifyWrittenData(toWrite, hiveShell.executeQuery("select * 
from " + tblName));
@@ -110,9 +109,8 @@ public class HiveTableSinkTest {
                List<Row> toWrite = generateRecords(5);
                tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, 
rowTypeInfo));
 
-               CatalogTable table = (CatalogTable) 
hiveCatalog.getTable(tablePath);
-               tableEnv.registerTableSink("destSink", new HiveTableSink(new 
JobConf(hiveConf), tablePath, table));
-               tableEnv.sqlQuery("select * from src").insertInto("destSink");
+               tableEnv.registerCatalog("hive", hiveCatalog);
+               tableEnv.sqlQuery("select * from src").insertInto("hive", 
"default", "dest");
                execEnv.execute();
 
                List<CatalogPartitionSpec> partitionSpecs = 
hiveCatalog.listPartitions(tablePath);
@@ -156,9 +154,8 @@ public class HiveTableSinkTest {
                BatchTableEnvironment tableEnv = 
BatchTableEnvironment.create(execEnv);
                tableEnv.registerDataSet("complexSrc", 
execEnv.fromCollection(toWrite, rowTypeInfo));
 
-               CatalogTable catalogTable = (CatalogTable) 
hiveCatalog.getTable(tablePath);
-               tableEnv.registerTableSink("complexSink", new HiveTableSink(new 
JobConf(hiveConf), tablePath, catalogTable));
-               tableEnv.sqlQuery("select * from 
complexSrc").insertInto("complexSink");
+               tableEnv.registerCatalog("hive", hiveCatalog);
+               tableEnv.sqlQuery("select * from 
complexSrc").insertInto("hive", "default", "dest");
                execEnv.execute();
 
                List<String> result = hiveShell.executeQuery("select * from " + 
tblName);
@@ -185,9 +182,7 @@ public class HiveTableSinkTest {
                toWrite.add(row);
 
                tableEnv.registerDataSet("nestedSrc", 
execEnv.fromCollection(toWrite, rowTypeInfo));
-               catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
-               tableEnv.registerTableSink("nestedSink", new HiveTableSink(new 
JobConf(hiveConf), tablePath, catalogTable));
-               tableEnv.sqlQuery("select * from 
nestedSrc").insertInto("nestedSink");
+               tableEnv.sqlQuery("select * from nestedSrc").insertInto("hive", 
"default", "dest");
                execEnv.execute();
 
                result = hiveShell.executeQuery("select * from " + tblName);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
index 1a641f4..6bef62d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java
@@ -19,13 +19,16 @@
 package org.apache.flink.table.factories;
 
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ExternalCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.descriptors.Descriptor;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Utility for dealing with {@link TableFactory} using the {@link 
TableFactoryService}.
@@ -101,4 +104,15 @@ public class TableFactoryUtil {
                return findAndCreateTableSource(table.toProperties());
        }
 
+       /**
+        * Creates a table sink for a {@link CatalogTable} using table factory 
associated with the catalog.
+        */
+       public static Optional<TableSink> 
createTableSinkForCatalogTable(Catalog catalog, CatalogTable catalogTable, 
ObjectPath tablePath) {
+               TableFactory tableFactory = 
catalog.getTableFactory().orElse(null);
+               if (tableFactory instanceof TableSinkFactory) {
+                       return Optional.ofNullable(((TableSinkFactory) 
tableFactory).createTableSink(tablePath, catalogTable));
+               }
+               return Optional.empty();
+       }
+
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
index be81a0b..021a650 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api.{TableConfig, TableEnvironment, 
TableException}
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, 
FlinkTypeFactory}
-import org.apache.flink.table.catalog.{CatalogManager, 
CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, 
FunctionCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, 
CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, 
FunctionCatalog, ObjectPath}
 import org.apache.flink.table.delegation.{Executor, Planner}
 import org.apache.flink.table.executor.ExecutorBase
 import org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl
@@ -248,7 +248,18 @@ abstract class PlannerBase(
       case Some(s) if JavaScalaConversionUtil.toScala(s.getCatalogTable)
         .exists(_.isInstanceOf[CatalogTable]) =>
 
-        val sinkProperties = 
s.getCatalogTable.get().asInstanceOf[CatalogTable].toProperties
+        val catalog = catalogManager.getCatalog(s.getTablePath.get(0))
+        val catalogTable = s.getCatalogTable.get().asInstanceOf[CatalogTable]
+        if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
+          val dbName = s.getTablePath.get(1)
+          val tableName = s.getTablePath.get(2)
+          val sink = TableFactoryUtil.createTableSinkForCatalogTable(
+            catalog.get(), catalogTable, new ObjectPath(dbName, tableName))
+          if (sink.isPresent) {
+            return Option(sink.get())
+          }
+        }
+        val sinkProperties = catalogTable.toProperties
         Option(TableFactoryService.find(classOf[TableSinkFactory[_]], 
sinkProperties)
           .createTableSink(sinkProperties))
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SinkTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SinkTest.scala
index c9bfcc4..a409ffe 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SinkTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SinkTest.scala
@@ -19,12 +19,19 @@
 package org.apache.flink.table.plan.batch.sql
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{DataTypes, TableSchema}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.catalog.{CatalogTableImpl, 
GenericInMemoryCatalog, ObjectPath}
+import org.apache.flink.table.factories.TableSinkFactory
 import org.apache.flink.table.plan.optimize.RelNodeBlockPlanBuilder
+import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.types.logical.{BigIntType, IntType}
 import org.apache.flink.table.util.TableTestBase
 
+import java.util.Optional
 import org.junit.Test
+import org.mockito.{ArgumentMatchers, Mockito}
+import scala.collection.JavaConverters._
 
 class SinkTest extends TableTestBase {
 
@@ -60,4 +67,25 @@ class SinkTest extends TableTestBase {
     util.verifyPlan()
   }
 
+  @Test
+  def testCatalogTableSink(): Unit = {
+    val schemaBuilder = new TableSchema.Builder()
+    schemaBuilder.fields(Array("i"), Array(DataTypes.INT()))
+    val schema = schemaBuilder.build()
+    val sink = util.createCollectTableSink(schema.getFieldNames, Array(INT))
+    val catalog = Mockito.spy(new GenericInMemoryCatalog("dummy"))
+    val factory = Mockito.mock(classOf[TableSinkFactory[_]])
+    
Mockito.when[Optional[_]](catalog.getTableFactory).thenReturn(Optional.of(factory))
+    Mockito.when[TableSink[_]](factory.createTableSink(
+      ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(sink)
+    util.tableEnv.registerCatalog(catalog.getName, catalog)
+    util.tableEnv.useCatalog(catalog.getName)
+    val catalogTable = new CatalogTableImpl(schema, Map[String, 
String]().asJava, "")
+    catalog.createTable(new ObjectPath("default", "tbl"), catalogTable, false)
+    util.tableEnv.sqlQuery("select 1").insertInto("tbl")
+    util.tableEnv.explain(false)
+    // verify we tried to get table factory from catalog
+    Mockito.verify(catalog, Mockito.atLeast(1)).getTableFactory
+  }
+
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index abb94c9..13d6c53 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -552,7 +552,18 @@ abstract class TableEnvImpl(
       case Some(s) if JavaScalaConversionUtil.toScala(s.getCatalogTable)
         .exists(_.isInstanceOf[CatalogTable]) =>
 
-        val sinkProperties = 
s.getCatalogTable.get().asInstanceOf[CatalogTable].toProperties
+        val catalog = catalogManager.getCatalog(s.getTablePath.get(0))
+        val catalogTable = s.getCatalogTable.get().asInstanceOf[CatalogTable]
+        if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
+          val dbName = s.getTablePath.get(1)
+          val tableName = s.getTablePath.get(2)
+          val sink = TableFactoryUtil.createTableSinkForCatalogTable(
+            catalog.get(), catalogTable, new ObjectPath(dbName, tableName))
+          if (sink.isPresent) {
+            return Option(sink.get())
+          }
+        }
+        val sinkProperties = catalogTable.toProperties
         Option(TableFactoryService.find(classOf[TableSinkFactory[_]], 
sinkProperties)
           .createTableSink(sinkProperties))
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index aa41b0c..de163a3 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -415,7 +415,18 @@ class StreamPlanner(
       case Some(s) if JavaScalaConversionUtil.toScala(s.getCatalogTable)
         .exists(_.isInstanceOf[CatalogTable]) =>
 
-        val sinkProperties = 
s.getCatalogTable.get().asInstanceOf[CatalogTable].toProperties
+        val catalog = catalogManager.getCatalog(s.getTablePath.get(0))
+        val catalogTable = s.getCatalogTable.get().asInstanceOf[CatalogTable]
+        if (catalog.isPresent && catalog.get().getTableFactory.isPresent) {
+          val dbName = s.getTablePath.get(1)
+          val tableName = s.getTablePath.get(2)
+          val sink = TableFactoryUtil.createTableSinkForCatalogTable(
+            catalog.get(), catalogTable, new ObjectPath(dbName, tableName))
+          if (sink.isPresent) {
+            return Option(sink.get())
+          }
+        }
+        val sinkProperties = catalogTable.toProperties
         Option(TableFactoryService.find(classOf[TableSinkFactory[_]], 
sinkProperties)
           .createTableSink(sinkProperties))
 

Reply via email to