bowenli86 commented on a change in pull request #8214: [FLINK-11476] [table]
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r280600786
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
##########
@@ -215,6 +218,74 @@ abstract class TableEnvironment(val config: TableConfig) {
rels.map(_.asInstanceOf[ExecNode[_, _]])
}
+ /**
+ * Register an [[ReadableCatalog]] under a unique name.
+ *
+ * @param name the name under which the catalog will be registered
+ * @param catalog the catalog to register
+ * @throws CatalogAlreadyExistsException thrown if the catalog already
exists
+ */
+ @throws[CatalogAlreadyExistsException]
+ def registerCatalog(name: String, catalog: ReadableCatalog): Unit = {
+ catalogManager.registerCatalog(name, catalog)
+ }
+
+ /**
+ * Get a registered [[ReadableCatalog]].
+ *
+ * @param catalogName name of the catalog to get
+ * @return the requested catalog
+ * @throws CatalogNotExistException thrown if the catalog doesn't exist
+ */
+ @throws[CatalogNotExistException]
+ def getCatalog(catalogName: String): ReadableCatalog = {
+ catalogManager.getCatalog(catalogName)
+ }
+
+ /**
+ * Get the current catalog.
+ *
+ * @return the current catalog in CatalogManager
+ */
+ def getCurrentCatalog(): ReadableCatalog = {
+ catalogManager.getCurrentCatalog
+ }
+
+ /**
+ * Get the current database name.
+ *
+ * @return the current database of the current catalog
+ */
+ def getCurrentDatabaseName(): String = {
+ catalogManager.getCurrentCatalog.getCurrentDatabase
+ }
+
+ /**
+ * Set the current catalog.
+ *
+ * @param name name of the catalog to set as current catalog
+ * @throws CatalogNotExistException thrown if the catalog doesn't exist
+ */
+ @throws[CatalogNotExistException]
+ def setCurrentCatalog(name: String): Unit = {
+ catalogManager.setCurrentCatalog(name)
+ }
+
+ /**
+ * Set the current catalog and current database.
+ *
+ * @param catalogName name of the catalog to set as current catalog
+ * @param databaseName name of the database to set as current database
+ * @throws CatalogNotExistException thrown if the catalog doesn't exist
+ * @throws DatabaseNotExistException thrown if the database doesn't exist
+ */
+ @throws[CatalogNotExistException]
+ @throws[DatabaseNotExistException]
+ def setCurrentDatabase(catalogName: String, databaseName: String): Unit = {
+ catalogManager.setCurrentCatalog(catalogName)
+ catalogManager.getCurrentCatalog.setCurrentDatabase(databaseName)
Review comment:
"default database" is only a default value of "current database", and yes,
"current database" is a session property.
I'm not strong on where to store the "current database". I prefer this way
because 1) since every catalog can have such a property, we would need a map in
CatalogManager to maintain the mapping then 2) it matches well with [catalog
configs in SQL Client yaml file
](https://docs.google.com/document/d/1ALxfiGZBaZ8KUNJtoT443hReoPoJEdt9Db2wiJ2LuWA/edit?usp=sharing)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services