dawidwys commented on a change in pull request #8214: [FLINK-11476] [table]
Create CatalogManager to manage multiple catalogs and encapsulate Calcite schema
URL: https://github.com/apache/flink/pull/8214#discussion_r280663142
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
##########
@@ -215,6 +218,74 @@ abstract class TableEnvironment(val config: TableConfig) {
rels.map(_.asInstanceOf[ExecNode[_, _]])
}
+ /**
+ * Register an [[ReadableCatalog]] under a unique name.
+ *
+ * @param name the name under which the catalog will be registered
+ * @param catalog the catalog to register
+ * @throws CatalogAlreadyExistsException thrown if the catalog already
exists
+ */
+ @throws[CatalogAlreadyExistsException]
+ def registerCatalog(name: String, catalog: ReadableCatalog): Unit = {
+ catalogManager.registerCatalog(name, catalog)
+ }
+
+ /**
+ * Get a registered [[ReadableCatalog]].
+ *
+ * @param catalogName name of the catalog to get
+ * @return the requested catalog
+ * @throws CatalogNotExistException thrown if the catalog doesn't exist
+ */
+ @throws[CatalogNotExistException]
+ def getCatalog(catalogName: String): ReadableCatalog = {
+ catalogManager.getCatalog(catalogName)
+ }
+
+ /**
+ * Get the current catalog.
+ *
+ * @return the current catalog in CatalogManager
+ */
+ def getCurrentCatalog(): ReadableCatalog = {
+ catalogManager.getCurrentCatalog
+ }
+
+ /**
+ * Get the current database name.
+ *
+ * @return the current database of the current catalog
+ */
+ def getCurrentDatabaseName(): String = {
+ catalogManager.getCurrentCatalog.getCurrentDatabase
+ }
+
+ /**
+ * Set the current catalog.
+ *
+ * @param name name of the catalog to set as current catalog
+ * @throws CatalogNotExistException thrown if the catalog doesn't exist
+ */
+ @throws[CatalogNotExistException]
+ def setCurrentCatalog(name: String): Unit = {
+ catalogManager.setCurrentCatalog(name)
+ }
+
+ /**
+ * Set the current catalog and current database.
+ *
+ * @param catalogName name of the catalog to set as current catalog
+ * @param databaseName name of the database to set as current database
+ * @throws CatalogNotExistException thrown if the catalog doesn't exist
+ * @throws DatabaseNotExistException thrown if the database doesn't exist
+ */
+ @throws[CatalogNotExistException]
+ @throws[DatabaseNotExistException]
+ def setCurrentDatabase(catalogName: String, databaseName: String): Unit = {
+ catalogManager.setCurrentCatalog(catalogName)
+ catalogManager.getCurrentCatalog.setCurrentDatabase(databaseName)
Review comment:
There can only be a single current database per session, not a one current
dB per catalog. So we don't need a `Map` in `CatalogManager`.
Have you checked what SQL standard says about it? As I understand it and
checked it there is always a default prefix per session (either catalog or
catalog + schema - equivalent to DB), so a current DB is not set per catalog.
What if the same catalog is used by multiple users for different sessions?
How do you retrieve the original default DB for a new session? As I said
originally, this is a property of a session and I strongly believe it should
not be stored in the catalog. WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services