bowenli86 commented on a change in pull request #10296: [FLINK-14691][table]Add 
use/create/drop/alter database operation and support it in flink/blink planner
URL: https://github.com/apache/flink/pull/10296#discussion_r350379505
 
 

 ##########
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 ##########
 @@ -455,6 +457,93 @@ public void createTable(CatalogBaseTable table, 
ObjectIdentifier objectIdentifie
                        "CreateTable");
        }
 
+       /**
+        * Creates a database in a given fully qualified path.
+        *
+        * @param catalogName The catalog where database will be created.S
+        * @param databaseName Name of database to be created.
+        * @param database The database to be created.
+        * @param ignoreIfExists If false exception will be thrown if a 
database exists in the given path.
+        */
+       public void createDatabase(String catalogName,
+                                       String databaseName,
+                                       CatalogDatabase database,
+                                       boolean ignoreIfExists,
+                                       boolean ignoreNoCatalog) {
+               Optional<Catalog> catalog = getCatalog(catalogName);
+               if (catalog.isPresent()) {
+                       try {
+                               catalog.get().createDatabase(databaseName, 
database, ignoreIfExists);
+                       } catch (DatabaseAlreadyExistException e) {
+                               throw new ValidationException(
+                                               String.format("Could not 
execute %s in path %s", "CREATE DATABASE", catalogName), e);
+                       } catch (Exception e) {
+                               throw new TableException(
+                                               String.format("Could not 
execute %s in path %s", "CREATE DATABASE", catalogName), e);
+                       }
+               } else if (!ignoreNoCatalog) {
+                       throw new ValidationException(String.format("Catalog %s 
does not exist.", catalogName));
+               }
+       }
+
+       /**
+        * Drop a database in a given path.
+        *
+        * @param catalogName        The catalog where database will be deleted.
+        * @param databaseName       Name of database to be deleted.
+        * @param ignoreIfNotExists  If false exception will be thrown if a 
database not exists in the given path.
+        * @param isRestrict         Flag to specify behavior when the database 
contains table:
+        *                              if set to false, delete all tables in 
the database and then delete the database,
+        *                              if set to true, throw an exception.
+        */
+       public void dropDatabase(String catalogName,
+                                       String databaseName,
+                                       boolean ignoreIfNotExists,
+                                       boolean isRestrict,
+                                       boolean ignoreNoCatalog) {
+               Optional<Catalog> catalog = getCatalog(catalogName);
+               if (catalog.isPresent()) {
+                       try {
+                               catalog.get().dropDatabase(databaseName, 
ignoreIfNotExists, isRestrict);
+                       } catch (DatabaseNotExistException | 
DatabaseNotEmptyException e) {
+                               throw new ValidationException(
+                                               String.format("Could not 
execute %s in path %s", "DROP DATABASE", catalogName), e);
+                       } catch (Exception e) {
+                               throw new TableException(
+                                               String.format("Could not 
execute %s in path %s", "DROP DATABASE", catalogName), e);
+                       }
+               } else if (!ignoreNoCatalog) {
+                       throw new ValidationException(String.format("Catalog %s 
does not exist.", catalogName));
+               }
+       }
+
+       /**
+        * Alter a database in a given path.
+        *
+        * @param catalogName     The catalog where database will be deleted.
+        * @param databaseName    Name of database to be deleted.
+        * @param catalogDatabase New catalogDatabase.
+        */
+       public void alterDatabase(String catalogName,
 
 Review comment:
   ditto

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to