This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 7b4f39d [FLINK-13170][table-planner] Planner should get table factory from catalog when creating sink for CatalogTable 7b4f39d is described below commit 7b4f39d9482fcba2526d5fe1adef9850de3473fc Author: Rui Li <li...@apache.org> AuthorDate: Tue Jul 9 20:09:20 2019 +0800 [FLINK-13170][table-planner] Planner should get table factory from catalog when creating sink for CatalogTable Planner should first try getting table factory from catalog when creating table sinks for CatalogTable. This closes #9039. --- .../batch/connectors/hive/HiveTableSinkTest.java | 19 ++++++--------- .../flink/table/factories/TableFactoryUtil.java | 14 +++++++++++ .../apache/flink/table/planner/PlannerBase.scala | 15 ++++++++++-- .../flink/table/plan/batch/sql/SinkTest.scala | 28 ++++++++++++++++++++++ .../flink/table/api/internal/TableEnvImpl.scala | 13 +++++++++- .../apache/flink/table/planner/StreamPlanner.scala | 13 +++++++++- 6 files changed, 86 insertions(+), 16 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java index fe54eac..d16f2b0 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSinkTest.java @@ -88,9 +88,8 @@ public class HiveTableSinkTest { List<Row> toWrite = generateRecords(5); tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo)); - CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath); - tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table)); - tableEnv.sqlQuery("select * from src").insertInto("destSink"); + tableEnv.registerCatalog("hive", hiveCatalog); + tableEnv.sqlQuery("select * from src").insertInto("hive", "default", "dest"); execEnv.execute(); verifyWrittenData(toWrite, hiveShell.executeQuery("select * from " + tblName)); @@ -110,9 +109,8 @@ public class HiveTableSinkTest { List<Row> toWrite = generateRecords(5); tableEnv.registerDataSet("src", execEnv.fromCollection(toWrite, rowTypeInfo)); - CatalogTable table = (CatalogTable) hiveCatalog.getTable(tablePath); - tableEnv.registerTableSink("destSink", new HiveTableSink(new JobConf(hiveConf), tablePath, table)); - tableEnv.sqlQuery("select * from src").insertInto("destSink"); + tableEnv.registerCatalog("hive", hiveCatalog); + tableEnv.sqlQuery("select * from src").insertInto("hive", "default", "dest"); execEnv.execute(); List<CatalogPartitionSpec> partitionSpecs = hiveCatalog.listPartitions(tablePath); @@ -156,9 +154,8 @@ public class HiveTableSinkTest { BatchTableEnvironment tableEnv = BatchTableEnvironment.create(execEnv); tableEnv.registerDataSet("complexSrc", execEnv.fromCollection(toWrite, rowTypeInfo)); - CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath); - tableEnv.registerTableSink("complexSink", new HiveTableSink(new JobConf(hiveConf), tablePath, catalogTable)); - tableEnv.sqlQuery("select * from complexSrc").insertInto("complexSink"); + tableEnv.registerCatalog("hive", hiveCatalog); + tableEnv.sqlQuery("select * from complexSrc").insertInto("hive", "default", "dest"); execEnv.execute(); List<String> result = hiveShell.executeQuery("select * from " + tblName); @@ -185,9 +182,7 @@ public class HiveTableSinkTest { toWrite.add(row); tableEnv.registerDataSet("nestedSrc", execEnv.fromCollection(toWrite, rowTypeInfo)); - catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath); - tableEnv.registerTableSink("nestedSink", new HiveTableSink(new JobConf(hiveConf), tablePath, catalogTable)); - tableEnv.sqlQuery("select * from nestedSrc").insertInto("nestedSink"); + tableEnv.sqlQuery("select * from nestedSrc").insertInto("hive", "default", "dest"); execEnv.execute(); result = hiveShell.executeQuery("select * from " + tblName); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java index 1a641f4..6bef62d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java @@ -19,13 +19,16 @@ package org.apache.flink.table.factories; import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ExternalCatalog; +import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.descriptors.Descriptor; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.TableSource; import java.util.Map; +import java.util.Optional; /** * Utility for dealing with {@link TableFactory} using the {@link TableFactoryService}. @@ -101,4 +104,15 @@ public class TableFactoryUtil { return findAndCreateTableSource(table.toProperties()); } + /** + * Creates a table sink for a {@link CatalogTable} using table factory associated with the catalog. + */ + public static Optional<TableSink> createTableSinkForCatalogTable(Catalog catalog, CatalogTable catalogTable, ObjectPath tablePath) { + TableFactory tableFactory = catalog.getTableFactory().orElse(null); + if (tableFactory instanceof TableSinkFactory) { + return Optional.ofNullable(((TableSinkFactory) tableFactory).createTableSink(tablePath, catalogTable)); + } + return Optional.empty(); + } + } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala index be81a0b..021a650 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/PlannerBase.scala @@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException} import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory} -import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, FunctionCatalog} +import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, FunctionCatalog, ObjectPath} import org.apache.flink.table.delegation.{Executor, Planner} import org.apache.flink.table.executor.ExecutorBase import org.apache.flink.table.expressions.PlannerTypeInferenceUtilImpl @@ -248,7 +248,18 @@ abstract class PlannerBase( case Some(s) if JavaScalaConversionUtil.toScala(s.getCatalogTable) .exists(_.isInstanceOf[CatalogTable]) => - val sinkProperties = s.getCatalogTable.get().asInstanceOf[CatalogTable].toProperties + val catalog = catalogManager.getCatalog(s.getTablePath.get(0)) + val catalogTable = s.getCatalogTable.get().asInstanceOf[CatalogTable] + if (catalog.isPresent && catalog.get().getTableFactory.isPresent) { + val dbName = s.getTablePath.get(1) + val tableName = s.getTablePath.get(2) + val sink = TableFactoryUtil.createTableSinkForCatalogTable( + catalog.get(), catalogTable, new ObjectPath(dbName, tableName)) + if (sink.isPresent) { + return Option(sink.get()) + } + } + val sinkProperties = catalogTable.toProperties Option(TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties) .createTableSink(sinkProperties)) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SinkTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SinkTest.scala index c9bfcc4..a409ffe 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SinkTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/SinkTest.scala @@ -19,12 +19,19 @@ package org.apache.flink.table.plan.batch.sql import org.apache.flink.api.scala._ +import org.apache.flink.table.api.{DataTypes, TableSchema} import org.apache.flink.table.api.scala._ +import org.apache.flink.table.catalog.{CatalogTableImpl, GenericInMemoryCatalog, ObjectPath} +import org.apache.flink.table.factories.TableSinkFactory import org.apache.flink.table.plan.optimize.RelNodeBlockPlanBuilder +import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.types.logical.{BigIntType, IntType} import org.apache.flink.table.util.TableTestBase +import java.util.Optional import org.junit.Test +import org.mockito.{ArgumentMatchers, Mockito} +import scala.collection.JavaConverters._ class SinkTest extends TableTestBase { @@ -60,4 +67,25 @@ class SinkTest extends TableTestBase { util.verifyPlan() } + @Test + def testCatalogTableSink(): Unit = { + val schemaBuilder = new TableSchema.Builder() + schemaBuilder.fields(Array("i"), Array(DataTypes.INT())) + val schema = schemaBuilder.build() + val sink = util.createCollectTableSink(schema.getFieldNames, Array(INT)) + val catalog = Mockito.spy(new GenericInMemoryCatalog("dummy")) + val factory = Mockito.mock(classOf[TableSinkFactory[_]]) + Mockito.when[Optional[_]](catalog.getTableFactory).thenReturn(Optional.of(factory)) + Mockito.when[TableSink[_]](factory.createTableSink( + ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(sink) + util.tableEnv.registerCatalog(catalog.getName, catalog) + util.tableEnv.useCatalog(catalog.getName) + val catalogTable = new CatalogTableImpl(schema, Map[String, String]().asJava, "") + catalog.createTable(new ObjectPath("default", "tbl"), catalogTable, false) + util.tableEnv.sqlQuery("select 1").insertInto("tbl") + util.tableEnv.explain(false) + // verify we tried to get table factory from catalog + Mockito.verify(catalog, Mockito.atLeast(1)).getTableFactory + } + } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index abb94c9..13d6c53 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -552,7 +552,18 @@ abstract class TableEnvImpl( case Some(s) if JavaScalaConversionUtil.toScala(s.getCatalogTable) .exists(_.isInstanceOf[CatalogTable]) => - val sinkProperties = s.getCatalogTable.get().asInstanceOf[CatalogTable].toProperties + val catalog = catalogManager.getCatalog(s.getTablePath.get(0)) + val catalogTable = s.getCatalogTable.get().asInstanceOf[CatalogTable] + if (catalog.isPresent && catalog.get().getTableFactory.isPresent) { + val dbName = s.getTablePath.get(1) + val tableName = s.getTablePath.get(2) + val sink = TableFactoryUtil.createTableSinkForCatalogTable( + catalog.get(), catalogTable, new ObjectPath(dbName, tableName)) + if (sink.isPresent) { + return Option(sink.get()) + } + } + val sinkProperties = catalogTable.toProperties Option(TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties) .createTableSink(sinkProperties)) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala index aa41b0c..de163a3 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala @@ -415,7 +415,18 @@ class StreamPlanner( case Some(s) if JavaScalaConversionUtil.toScala(s.getCatalogTable) .exists(_.isInstanceOf[CatalogTable]) => - val sinkProperties = s.getCatalogTable.get().asInstanceOf[CatalogTable].toProperties + val catalog = catalogManager.getCatalog(s.getTablePath.get(0)) + val catalogTable = s.getCatalogTable.get().asInstanceOf[CatalogTable] + if (catalog.isPresent && catalog.get().getTableFactory.isPresent) { + val dbName = s.getTablePath.get(1) + val tableName = s.getTablePath.get(2) + val sink = TableFactoryUtil.createTableSinkForCatalogTable( + catalog.get(), catalogTable, new ObjectPath(dbName, tableName)) + if (sink.isPresent) { + return Option(sink.get()) + } + } + val sinkProperties = catalogTable.toProperties Option(TableFactoryService.find(classOf[TableSinkFactory[_]], sinkProperties) .createTableSink(sinkProperties))