This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 65cd7b55eb7c0c7af840eebdddf83ae46d28e204 Author: jerryjzhang <[email protected]> AuthorDate: Fri Sep 14 16:26:36 2018 +0800 [FLINK-10079] [table] Look up sink tables in external catalogs. This closes #6508. --- .../flink/table/api/BatchTableEnvironment.scala | 4 +- .../flink/table/api/StreamTableEnvironment.scala | 4 +- .../apache/flink/table/api/TableEnvironment.scala | 60 ++++++++---- .../table/catalog/ExternalCatalogSchema.scala | 2 +- .../table/api/ExternalCatalogInsertTest.scala | 107 +++++++++++++++++++++ .../table/catalog/ExternalCatalogSchemaTest.scala | 2 +- .../flink/table/runtime/utils/CommonTestData.scala | 23 +++++ 7 files changed, 179 insertions(+), 23 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index 9265f0f..04a7916 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -107,7 +107,7 @@ abstract class BatchTableEnvironment( // check for proper batch table source case batchTableSource: BatchTableSource[_] => // check if a table (source or sink) is registered - Option(getTable(name)) match { + getTable(name) match { // table source and/or sink is registered case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match { @@ -249,7 +249,7 @@ abstract class BatchTableEnvironment( case _: BatchTableSink[_] => // check if a table (source or sink) is registered - Option(getTable(name)) match { + getTable(name) match { // table source and/or sink is registered case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 4c73032..d31ce6c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -126,7 +126,7 @@ abstract class StreamTableEnvironment( } // register - Option(getTable(name)) match { + getTable(name) match { // check if a table (source or sink) is registered case Some(table: TableSourceSinkTable[_, _]) => table.tableSourceTable match { @@ -273,7 +273,7 @@ abstract class StreamTableEnvironment( case _: StreamTableSink[_] => // check if a table (source or sink) is registered - Option(getTable(name)) match { + getTable(name) match { // table source and/or sink is registered case Some(table: TableSourceSinkTable[_, _]) => table.tableSinkTable match { diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 195812d..5691ab7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -28,7 +28,6 @@ import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder} import org.apache.calcite.plan.{RelOptPlanner, RelOptUtil, RelTraitSet} import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.schema.SchemaPlus import org.apache.calcite.schema.impl.AbstractTable import org.apache.calcite.sql._ @@ -749,42 +748,42 @@ abstract class TableEnvironment(val config: TableConfig) { // check that sink table exists if (null == sinkTableName) throw TableException("Name of TableSink must not be null.") if (sinkTableName.isEmpty) throw TableException("Name of TableSink must not be empty.") - if (!isRegistered(sinkTableName)) { - throw TableException(s"No table was registered under the name $sinkTableName.") - } getTable(sinkTableName) match { - // check for registered table that wraps a sink - case s: TableSourceSinkTable[_, _] if s.tableSinkTable.isDefined => + case None => + throw TableException(s"No table was registered under the name $sinkTableName.") + + case Some(s: TableSourceSinkTable[_, _]) if s.tableSinkTable.isDefined => val tableSink = s.tableSinkTable.get.tableSink // validate schema of source table and table sink val srcFieldTypes = table.getSchema.getTypes val sinkFieldTypes = tableSink.getFieldTypes if (srcFieldTypes.length != sinkFieldTypes.length || - srcFieldTypes.zip(sinkFieldTypes).exists{case (srcF, snkF) => srcF != snkF}) { + srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) => srcF != snkF }) { val srcFieldNames = table.getSchema.getColumnNames val sinkFieldNames = tableSink.getFieldNames // format table and table sink schema strings val srcSchema = srcFieldNames.zip(srcFieldTypes) - .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"} + .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" } .mkString("[", ", ", "]") val sinkSchema = sinkFieldNames.zip(sinkFieldTypes) - .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"} + .map { case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}" } .mkString("[", ", ", "]") throw ValidationException( - s"Field types of query result and registered TableSink $sinkTableName do not match.\n" + + s"Field types of query result and registered TableSink " + + s"$sinkTableName do not match.\n" + s"Query result schema: $srcSchema\n" + s"TableSink schema: $sinkSchema") } - // emit the table to the configured table sink writeToSink(table, tableSink, conf) - case _ => + + case Some(_) => throw TableException(s"The table registered as $sinkTableName is not a TableSink. " + s"You can only emit query results to a registered TableSink.") } @@ -828,12 +827,39 @@ abstract class TableEnvironment(val config: TableConfig) { rootSchema.getTableNames.contains(name) } - protected def getTable(name: String): org.apache.calcite.schema.Table = { - rootSchema.getTable(name) - } + /** + * Get a table from either internal or external catalogs. + * + * @param name The name of the table. + * @return The table registered either internally or externally, None otherwise. + */ + protected def getTable(name: String): Option[org.apache.calcite.schema.Table] = { + + // recursively fetches a table from a schema. + def getTableFromSchema( + schema: SchemaPlus, + path: List[String]): Option[org.apache.calcite.schema.Table] = { + + path match { + case tableName :: Nil => + // look up table + Option(schema.getTable(tableName)) + case subschemaName :: remain => + // look up subschema + val subschema = Option(schema.getSubSchema(subschemaName)) + subschema match { + case Some(s) => + // search for table in subschema + getTableFromSchema(s, remain) + case None => + // subschema does not exist + None + } + } + } - protected def getRowType(name: String): RelDataType = { - rootSchema.getTable(name).getRowType(typeFactory) + val pathNames = name.split('.').toList + getTableFromSchema(rootSchema, pathNames) } /** Returns a unique temporary attribute name. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala index adac938..c3eac8e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala @@ -94,7 +94,7 @@ class ExternalCatalogSchema( override def getFunctionNames: JSet[String] = JCollections.emptySet[String] - override def getTableNames: JSet[String] = JCollections.emptySet[String] + override def getTableNames: JSet[String] = new JLinkedHashSet(catalog.listTables()) override def snapshot(v: SchemaVersion): Schema = this diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala new file mode 100644 index 0000000..4b1fb18 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogInsertTest.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.CommonTestData +import org.apache.flink.table.utils.TableTestBase +import org.junit.Test + +/** + * Test for inserting into tables from external catalog. + */ +class ExternalCatalogInsertTest extends TableTestBase { + private val tableBatchEnv = TableEnvironment.getTableEnvironment( + ExecutionEnvironment.getExecutionEnvironment) + private val tableStreamEnv = TableEnvironment.getTableEnvironment( + StreamExecutionEnvironment.getExecutionEnvironment) + + @Test + def testBatchTableApi(): Unit = { + tableBatchEnv.registerExternalCatalog( + "test", + CommonTestData.getInMemoryTestCatalog(isStreaming = false)) + + val table1 = tableBatchEnv.scan("test", "db1", "tb1") + val table2 = tableBatchEnv.scan("test", "db2", "tb2") + table2.select('d * 2, 'e, 'g.upperCase()) + .unionAll(table1.select('a * 2, 'b, 'c.upperCase())) + .insertInto("test.db3.tb3") + } + + @Test + def testBatchSQL(): Unit = { + tableBatchEnv.registerExternalCatalog( + "test", + CommonTestData.getInMemoryTestCatalog(isStreaming = false)) + + val sqlInsert = "INSERT INTO `test.db3.tb3` SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " + + "UNION ALL (SELECT a * 2, b, c FROM test.db1.tb1)" + + tableBatchEnv.sqlUpdate(sqlInsert) + } + + @Test + def testStreamTableApi(): Unit = { + var tableEnv = tableStreamEnv + + tableEnv.registerExternalCatalog( + "test", + CommonTestData.getInMemoryTestCatalog(isStreaming = true)) + + val table1 = tableEnv.scan("test", "db1", "tb1") + val table2 = tableEnv.scan("test", "db2", "tb2") + + table2.where("d < 3") + .select('d * 2, 'e, 'g.upperCase()) + .unionAll(table1.select('a * 2, 'b, 'c.upperCase())) + .insertInto("test.db3.tb3") + } + + @Test + def testStreamSQL(): Unit = { + var tableEnv = tableStreamEnv + + tableEnv.registerExternalCatalog( + "test", + CommonTestData.getInMemoryTestCatalog(isStreaming = true)) + + val sqlInsert = "INSERT INTO `test.db3.tb3` SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 " + + "UNION ALL (SELECT a * 2, b, c FROM test.db1.tb1)" + + tableEnv.sqlUpdate(sqlInsert) + } + + @Test + def testTopLevelTable(): Unit = { + var tableEnv = tableBatchEnv + + tableEnv.registerExternalCatalog( + "test", + CommonTestData.getInMemoryTestCatalog(isStreaming = false)) + + val table1 = tableEnv.scan("test", "tb1") + val table2 = tableEnv.scan("test", "db2", "tb2") + table2.select('d * 2, 'e, 'g.upperCase()) + .unionAll(table1.select('a * 2, 'b, 'c.upperCase())) + .insertInto("test.tb3") + } +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala index c98a7c1..2ca7fba 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala @@ -71,7 +71,7 @@ class ExternalCatalogSchemaTest extends TableTestBase { .filter(_.getType.equals(SqlMonikerType.SCHEMA)) .map(_.getFullyQualifiedNames.asScala.toList).toSet assertTrue(Set(List(schemaName), List(schemaName, "db1"), - List(schemaName, "db2")) == subSchemas) + List(schemaName, "db2"), List(schemaName, "db3")) == subSchemas) } @Test diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala index e62396f..64fcc8a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala @@ -129,16 +129,39 @@ object CommonTestData { externalTableBuilder2.inAppendMode() } + val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp") + val connDesc3 = FileSystem().path(tempFilePath3) + val formatDesc3 = Csv() + .field("x", Types.INT) + .field("y", Types.LONG) + .field("z", Types.STRING) + .fieldDelimiter("#") + val schemaDesc3 = Schema() + .field("x", Types.INT) + .field("y", Types.LONG) + .field("z", Types.STRING) + val externalTableBuilder3 = ExternalCatalogTable.builder(connDesc3) + .withFormat(formatDesc3) + .withSchema(schemaDesc3) + + if (isStreaming) { + externalTableBuilder3.inAppendMode() + } + val catalog = new InMemoryExternalCatalog("test") val db1 = new InMemoryExternalCatalog("db1") val db2 = new InMemoryExternalCatalog("db2") + val db3 = new InMemoryExternalCatalog("db3") catalog.createSubCatalog("db1", db1, ignoreIfExists = false) catalog.createSubCatalog("db2", db2, ignoreIfExists = false) + catalog.createSubCatalog("db3", db3, ignoreIfExists = false) // Register the table with both catalogs catalog.createTable("tb1", externalTableBuilder1.asTableSource(), ignoreIfExists = false) + catalog.createTable("tb3", externalTableBuilder3.asTableSink(), ignoreIfExists = false) db1.createTable("tb1", externalTableBuilder1.asTableSource(), ignoreIfExists = false) db2.createTable("tb2", externalTableBuilder2.asTableSource(), ignoreIfExists = false) + db3.createTable("tb3", externalTableBuilder3.asTableSink(), ignoreIfExists = false) catalog }
