fhueske commented on a change in pull request #6508: [Flink-10079] [table]
Support external sink table in the INSERT INTO clause
URL: https://github.com/apache/flink/pull/6508#discussion_r218205054
##########
File path:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
##########
@@ -749,44 +748,48 @@ abstract class TableEnvironment(val config: TableConfig)
{
// check that sink table exists
if (null == sinkTableName) throw TableException("Name of TableSink must
not be null.")
if (sinkTableName.isEmpty) throw TableException("Name of TableSink must
not be empty.")
- if (!isRegistered(sinkTableName)) {
- throw TableException(s"No table was registered under the name
$sinkTableName.")
- }
getTable(sinkTableName) match {
// check for registered table that wraps a sink
- case s: TableSourceSinkTable[_, _] if s.tableSinkTable.isDefined =>
- val tableSink = s.tableSinkTable.get.tableSink
- // validate schema of source table and table sink
- val srcFieldTypes = table.getSchema.getTypes
- val sinkFieldTypes = tableSink.getFieldTypes
-
- if (srcFieldTypes.length != sinkFieldTypes.length ||
- srcFieldTypes.zip(sinkFieldTypes).exists{case (srcF, snkF) => srcF
!= snkF}) {
-
- val srcFieldNames = table.getSchema.getColumnNames
- val sinkFieldNames = tableSink.getFieldNames
-
- // format table and table sink schema strings
- val srcSchema = srcFieldNames.zip(srcFieldTypes)
- .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
- .mkString("[", ", ", "]")
- val sinkSchema = sinkFieldNames.zip(sinkFieldTypes)
- .map{case (n, t) => s"$n: ${t.getTypeClass.getSimpleName}"}
- .mkString("[", ", ", "]")
-
- throw ValidationException(
- s"Field types of query result and registered TableSink
$sinkTableName do not match.\n" +
- s"Query result schema: $srcSchema\n" +
- s"TableSink schema: $sinkSchema")
- }
+ case Some(s: TableSourceSinkTable[_, _]) => s.tableSinkTable match {
Review comment:
We can also check with `case None => ` whether we found a table or not and
avoid nesting `match` blocks.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services