dawidwys commented on a change in pull request #8214: [FLINK-11476] [table] 
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r280379269
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ##########
 @@ -215,6 +218,74 @@ abstract class TableEnvironment(val config: TableConfig) {
     rels.map(_.asInstanceOf[ExecNode[_, _]])
   }
 
+  /**
+    * Register an [[ReadableCatalog]] under a unique name.
+    *
+    * @param name the name under which the catalog will be registered
+    * @param catalog the catalog to register
+    * @throws CatalogAlreadyExistsException thrown if the catalog already 
exists
+    */
+  @throws[CatalogAlreadyExistsException]
+  def registerCatalog(name: String, catalog: ReadableCatalog): Unit = {
+    catalogManager.registerCatalog(name, catalog)
+  }
+
+  /**
+    * Get a registered [[ReadableCatalog]].
+    *
+    * @param catalogName name of the catalog to get
+    * @return the requested catalog
+    * @throws CatalogNotExistException thrown if the catalog doesn't exist
+    */
+  @throws[CatalogNotExistException]
+  def getCatalog(catalogName: String): ReadableCatalog = {
+    catalogManager.getCatalog(catalogName)
+  }
+
+  /**
+    * Get the current catalog.
+    *
+    * @return the current catalog in CatalogManager
+    */
+  def getCurrentCatalog(): ReadableCatalog = {
+    catalogManager.getCurrentCatalog
+  }
+
+  /**
+    * Get the current database name.
+    *
+    * @return the current database of the current catalog
+    */
+  def getCurrentDatabaseName(): String = {
+    catalogManager.getCurrentCatalog.getCurrentDatabase
+  }
+
+  /**
+    * Set the current catalog.
+    *
+    * @param name name of the catalog to set as current catalog
+    * @throws CatalogNotExistException thrown if the catalog doesn't exist
+    */
+  @throws[CatalogNotExistException]
+  def setCurrentCatalog(name: String): Unit = {
+    catalogManager.setCurrentCatalog(name)
+  }
+
+  /**
+    * Set the current catalog and current database.
+    *
+    * @param catalogName name of the catalog to set as current catalog
+    * @param databaseName name of the database to set as current database
+    * @throws CatalogNotExistException  thrown if the catalog doesn't exist
+    * @throws DatabaseNotExistException thrown if the database doesn't exist
+    */
+  @throws[CatalogNotExistException]
+  @throws[DatabaseNotExistException]
+  def setCurrentDatabase(catalogName: String, databaseName: String): Unit = {
+    catalogManager.setCurrentCatalog(catalogName)
+    catalogManager.getCurrentCatalog.setCurrentDatabase(databaseName)
 
 Review comment:
   I wonder if we should mutate the current database on 
`ReadableWritableCatalog`. Both current catalog as well as current database are 
session specific properties not catalog specific and therefore I think they 
should be stored in `CatalogManager` atm. 
   
   I understand that different catalogs might have different default databases. 
E.g. I understand if we say `set catalog hive`, it might make sense to set both 
`catalog: hive` and `database: default`. I think though this might be some 
static catalog specific property, e.g. we could replace the 
`get/setCurrentDatabase` with `Optional<String> getDefaultDatabase()`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to