This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 33cc293beea3bf02e431ceaab1ba787c7ee26cc3
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Thu Jul 18 14:44:52 2019 +0200

    [hotfix][table] Unify default catalog & builtin catalog naming
---
 .../java/org/apache/flink/table/api/Table.java     |  2 +-
 .../table/api/internal/TableEnvironmentImpl.java   | 21 +++++++---------
 .../apache/flink/table/catalog/CatalogManager.java | 28 ++++++++++++++++------
 .../flink/table/catalog/FunctionCatalog.java       | 13 +++++-----
 .../table/api/scala/BatchTableEnvironment.scala    |  4 ++--
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |  5 ++--
 .../flink/table/api/internal/TableEnvImpl.scala    | 27 +++++++++++----------
 7 files changed, 56 insertions(+), 44 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 0087f94..70350fa 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -830,7 +830,7 @@ public interface Table {
 
        /**
         * Writes the {@link Table} to a {@link TableSink} that was registered 
under the specified name
-        * in the initial default catalog.
+        * in the built-in catalog.
         *
         * <p>A batch {@link Table} can only be written to a
         * {@code org.apache.flink.table.sinks.BatchTableSink}, a streaming 
{@link Table} requires a
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 97d27f0..0b5d5fe 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -84,9 +84,6 @@ public class TableEnvironmentImpl implements TableEnvironment 
{
        // and this should always be true. This avoids too many hard code.
        private static final boolean IS_STREAM_TABLE = true;
        private final CatalogManager catalogManager;
-
-       private final String builtinCatalogName;
-       private final String builtinDatabaseName;
        private final OperationTreeBuilder operationTreeBuilder;
        private final List<ModifyOperation> bufferedModifyOperations = new 
ArrayList<>();
 
@@ -106,10 +103,6 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
                this.execEnv = executor;
 
                this.tableConfig = tableConfig;
-               // The current catalog and database are definitely builtin,
-               // see #create(EnvironmentSettings)
-               this.builtinCatalogName = catalogManager.getCurrentCatalog();
-               this.builtinDatabaseName = catalogManager.getCurrentDatabase();
 
                this.functionCatalog = functionCatalog;
                this.planner = planner;
@@ -485,8 +478,8 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
        protected void registerTableInternal(String name, CatalogBaseTable 
table) {
                try {
                        checkValidTableName(name);
-                       ObjectPath path = new ObjectPath(builtinDatabaseName, 
name);
-                       Optional<Catalog> catalog = 
catalogManager.getCatalog(builtinCatalogName);
+                       ObjectPath path = new 
ObjectPath(catalogManager.getBuiltInDatabaseName(), name);
+                       Optional<Catalog> catalog = 
catalogManager.getCatalog(catalogManager.getBuiltInCatalogName());
                        if (catalog.isPresent()) {
                                catalog.get().createTable(
                                        path,
@@ -500,8 +493,8 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
 
        private void replaceTableInternal(String name, CatalogBaseTable table) {
                try {
-                       ObjectPath path = new ObjectPath(builtinDatabaseName, 
name);
-                       Optional<Catalog> catalog = 
catalogManager.getCatalog(builtinCatalogName);
+                       ObjectPath path = new 
ObjectPath(catalogManager.getBuiltInDatabaseName(), name);
+                       Optional<Catalog> catalog = 
catalogManager.getCatalog(catalogManager.getBuiltInCatalogName());
                        if (catalog.isPresent()) {
                                catalog.get().alterTable(
                                        path,
@@ -521,7 +514,8 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
 
        private void registerTableSourceInternal(String name, TableSource<?> 
tableSource) {
                validateTableSource(tableSource);
-               Optional<CatalogBaseTable> table = 
getCatalogTable(builtinCatalogName, builtinDatabaseName, name);
+               Optional<CatalogBaseTable> table = 
getCatalogTable(catalogManager.getBuiltInCatalogName(),
+                       catalogManager.getBuiltInDatabaseName(), name);
 
                if (table.isPresent()) {
                        if (table.get() instanceof ConnectorCatalogTable<?, ?>) 
{
@@ -546,7 +540,8 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
        }
 
        private void registerTableSinkInternal(String name, TableSink<?> 
tableSink) {
-               Optional<CatalogBaseTable> table = 
getCatalogTable(builtinCatalogName, builtinDatabaseName, name);
+               Optional<CatalogBaseTable> table = 
getCatalogTable(catalogManager.getBuiltInCatalogName(),
+                       catalogManager.getBuiltInDatabaseName(), name);
 
                if (table.isPresent()) {
                        if (table.get() instanceof ConnectorCatalogTable<?, ?>) 
{
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index c5d0bc7..5933487 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -64,8 +64,8 @@ public class CatalogManager {
 
        private String currentDatabaseName;
 
-       // The name of the default catalog
-       private final String defaultCatalogName;
+       // The name of the built-in catalog
+       private final String builtInCatalogName;
 
        /**
         * Temporary solution to handle both {@link CatalogBaseTable} and
@@ -128,7 +128,9 @@ public class CatalogManager {
                catalogs.put(defaultCatalogName, defaultCatalog);
                this.currentCatalogName = defaultCatalogName;
                this.currentDatabaseName = defaultCatalog.getDefaultDatabase();
-               this.defaultCatalogName = defaultCatalogName;
+
+               // right now the default catalog is always the built-in one
+               this.builtInCatalogName = defaultCatalogName;
        }
 
        /**
@@ -298,12 +300,24 @@ public class CatalogManager {
        }
 
        /**
-        * Gets the default catalog name.
+        * Gets the built-in catalog name. The built-in catalog is used for 
storing all non-serializable
+        * transient meta-objects.
+        *
+        * @return the built-in catalog name
+        */
+       public String getBuiltInCatalogName() {
+               return builtInCatalogName;
+       }
+
+       /**
+        * Gets the built-in database name in the built-in catalog. The 
built-in database is used for storing
+        * all non-serializable transient meta-objects.
         *
-        * @return the default catalog
+        * @return the built-in database name
         */
-       public String getDefaultCatalogName() {
-               return defaultCatalogName;
+       public String getBuiltInDatabaseName() {
+               // The default database of the built-in catalog is also the 
built-in database.
+               return 
catalogs.get(getBuiltInCatalogName()).getDefaultDatabase();
        }
 
        /**
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 01639b8..1faffde 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -150,9 +150,7 @@ public class FunctionCatalog implements FunctionLookup {
                                .map(FunctionDefinition::toString)
                                .collect(Collectors.toList()));
 
-               return result.stream()
-                       .collect(Collectors.toList())
-                       .toArray(new String[0]);
+               return result.toArray(new String[0]);
        }
 
        @Override
@@ -180,7 +178,7 @@ public class FunctionCatalog implements FunctionLookup {
                                                userCandidate)
                                );
                        } else {
-                               // TODO: should go thru function definition 
discover service
+                               // TODO: should go through function definition 
discover service
                        }
                } catch (FunctionNotExistException e) {
                        // Ignore
@@ -206,10 +204,11 @@ public class FunctionCatalog implements FunctionLookup {
                                .map(Function.identity());
                }
 
-               String defaultCatalogName = 
catalogManager.getDefaultCatalogName();
-
                return foundDefinition.map(definition -> new 
FunctionLookup.Result(
-                       ObjectIdentifier.of(defaultCatalogName, 
catalogManager.getCatalog(defaultCatalogName).get().getDefaultDatabase(), name),
+                       ObjectIdentifier.of(
+                               catalogManager.getBuiltInCatalogName(),
+                               catalogManager.getBuiltInDatabaseName(),
+                               name),
                        definition)
                );
        }
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
index 6b3ecbc..516e6b1 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -295,11 +295,11 @@ object BatchTableEnvironment {
           classOf[ExecutionEnvironment],
           classOf[TableConfig],
           classOf[CatalogManager])
-      val defaultCatalog = "default_catalog"
+      val builtInCatalog = "default_catalog"
       val catalogManager = new CatalogManager(
         "default_catalog",
         new GenericInMemoryCatalog(
-          defaultCatalog,
+          builtInCatalog,
           "default_database")
       )
       const.newInstance(executionEnvironment, tableConfig, catalogManager)
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 79631f3..0923bba 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -76,9 +76,10 @@ class FlinkRelMdHandlerTestBase {
   val tableConfig = new TableConfig()
   val rootSchema: SchemaPlus = MetadataTestUtil.initRootSchema()
 
-  val defaultCatalog = "default_catalog"
+  val builtinCatalog = "default_catalog"
+  val builtinDatabase = "default_database"
   val catalogManager = new CatalogManager(
-    defaultCatalog, new GenericInMemoryCatalog(defaultCatalog, 
"default_database"))
+    builtinCatalog, new GenericInMemoryCatalog(builtinCatalog, 
builtinDatabase))
 
   // TODO batch RelNode and stream RelNode should have different PlannerContext
   //  and RelOptCluster due to they have different trait definitions.
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 021a732..b75426c 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -60,10 +60,6 @@ abstract class TableEnvImpl(
     private val catalogManager: CatalogManager)
   extends TableEnvironment {
 
-  // The current catalog and database are definitely builtin.
-  protected val builtinCatalogName: String = catalogManager.getCurrentCatalog
-  protected val builtinDatabaseName: String = catalogManager.getCurrentDatabase
-
   // Table API/SQL function catalog
   private[flink] val functionCatalog: FunctionCatalog = new 
FunctionCatalog(catalogManager)
 
@@ -266,7 +262,9 @@ abstract class TableEnvImpl(
     tableSource: TableSource[_])
   : Unit = {
     // register
-    getCatalogTable(builtinCatalogName, builtinDatabaseName, name) match {
+    getCatalogTable(
+      catalogManager.getBuiltInCatalogName,
+      catalogManager.getBuiltInDatabaseName, name) match {
 
       // check if a table (source or sink) is registered
       case Some(table: ConnectorCatalogTable[_, _]) =>
@@ -293,7 +291,10 @@ abstract class TableEnvImpl(
     tableSink: TableSink[_])
   : Unit = {
     // check if a table (source or sink) is registered
-    getCatalogTable(builtinCatalogName, builtinDatabaseName, name) match {
+    getCatalogTable(
+      catalogManager.getBuiltInCatalogName,
+      catalogManager.getBuiltInDatabaseName,
+      name) match {
 
       // table source and/or sink is registered
       case Some(table: ConnectorCatalogTable[_, _]) =>
@@ -352,27 +353,29 @@ abstract class TableEnvImpl(
 
   protected def registerTableInternal(name: String, table: CatalogBaseTable): 
Unit = {
     checkValidTableName(name)
-    val path = new ObjectPath(builtinDatabaseName, name)
-    
JavaScalaConversionUtil.toScala(catalogManager.getCatalog(builtinCatalogName)) 
match {
+    val path = new ObjectPath(catalogManager.getBuiltInDatabaseName, name)
+    JavaScalaConversionUtil.toScala(
+      catalogManager.getCatalog(catalogManager.getBuiltInCatalogName)) match {
       case Some(catalog) =>
         catalog.createTable(
           path,
           table,
           false)
-      case None => throw new TableException("The default catalog does not 
exist.")
+      case None => throw new TableException("The built-in catalog does not 
exist.")
     }
   }
 
   protected def replaceTableInternal(name: String, table: CatalogBaseTable): 
Unit = {
     checkValidTableName(name)
-    val path = new ObjectPath(builtinDatabaseName, name)
-    
JavaScalaConversionUtil.toScala(catalogManager.getCatalog(builtinCatalogName)) 
match {
+    val path = new ObjectPath(catalogManager.getBuiltInDatabaseName, name)
+    JavaScalaConversionUtil.toScala(
+      catalogManager.getCatalog(catalogManager.getBuiltInCatalogName)) match {
       case Some(catalog) =>
         catalog.alterTable(
           path,
           table,
           false)
-      case None => throw new TableException("The default catalog does not 
exist.")
+      case None => throw new TableException("The built-in catalog does not 
exist.")
     }
   }
 

Reply via email to