fhueske commented on a change in pull request #6508: [Flink-10079] [table] 
Support external sink table in the INSERT INTO clause 
URL: https://github.com/apache/flink/pull/6508#discussion_r217895776
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##########
 @@ -820,20 +820,59 @@ abstract class TableEnvironment(val config: TableConfig) 
{
 
   /**
     * Checks if a table is registered under the given name.
+    * Internal and external catalogs are both checked.
     *
     * @param name The table name to check.
     * @return true, if a table is registered under the name, false otherwise.
     */
   protected[flink] def isRegistered(name: String): Boolean = {
-    rootSchema.getTableNames.contains(name)
+    var isRegistered = rootSchema.getTableNames.contains(name)
+
+    // check if the table exists in external catalogs
+    if (!isRegistered) {
+      val (externalSchema, externalTableName) = resolveExternalTable(name)
+      if (externalSchema != null) {
+        isRegistered = externalSchema.getTableNames.contains(externalTableName)
+      }
+    }
+
+    return isRegistered
   }
 
   protected def getTable(name: String): org.apache.calcite.schema.Table = {
-    rootSchema.getTable(name)
+    var table = rootSchema.getTable(name)
 
 Review comment:
   use `val` when possible

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to