This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d8e77674a88 [FLINK-32403][table] Add database related operations in 
CatalogManager (#22869)
d8e77674a88 is described below

commit d8e77674a885feba22dd079656e4b39f33fa5da1
Author: Shammon FY <[email protected]>
AuthorDate: Thu Jul 20 14:46:43 2023 +0800

    [FLINK-32403][table] Add database related operations in CatalogManager 
(#22869)
---
 .../apache/flink/table/catalog/CatalogManager.java | 67 ++++++++++++++++++++++
 .../operations/ddl/AlterDatabaseOperation.java     |  6 +-
 .../operations/ddl/CreateDatabaseOperation.java    |  5 +-
 .../operations/ddl/DropDatabaseOperation.java      |  5 +-
 .../operations/SqlDdlToOperationConverterTest.java | 10 ++--
 5 files changed, 78 insertions(+), 15 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 2ec64890e93..4be17444b9a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -28,6 +28,8 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
@@ -1066,4 +1068,69 @@ public final class CatalogManager implements 
CatalogRegistry {
         final ResolvedSchema resolvedSchema = 
view.getUnresolvedSchema().resolve(schemaResolver);
         return new ResolvedCatalogView(view, resolvedSchema);
     }
+
+    /**
+     * Create a database.
+     *
+     * @param catalogName Name of the catalog for database
+     * @param databaseName Name of the database to be created
+     * @param database The database definition
+     * @param ignoreIfExists Flag to specify behavior when a database with the 
given name already
+     *     exists: if set to false, throw a DatabaseAlreadyExistException, if 
set to true, do
+     *     nothing.
+     * @throws DatabaseAlreadyExistException if the given database already 
exists and ignoreIfExists
+     *     is false
+     * @throws CatalogException in case of any runtime exception
+     */
+    public void createDatabase(
+            String catalogName,
+            String databaseName,
+            CatalogDatabase database,
+            boolean ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        Catalog catalog = getCatalogOrThrowException(catalogName);
+        catalog.createDatabase(databaseName, database, ignoreIfExists);
+    }
+
+    /**
+     * Drop a database.
+     *
+     * @param catalogName Name of the catalog for database.
+     * @param databaseName Name of the database to be dropped.
+     * @param ignoreIfNotExists Flag to specify behavior when the database 
does not exist: if set to
+     *     false, throw an exception, if set to true, do nothing.
+     * @param cascade Flag to specify behavior when the database contains 
table or function: if set
+     *     to true, delete all tables and functions in the database and then 
delete the database, if
+     *     set to false, throw an exception.
+     * @throws DatabaseNotExistException if the given database does not exist
+     * @throws DatabaseNotEmptyException if the given database is not empty 
and isRestrict is true
+     * @throws CatalogException in case of any runtime exception
+     */
+    public void dropDatabase(
+            String catalogName, String databaseName, boolean 
ignoreIfNotExists, boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+        Catalog catalog = getCatalogOrError(catalogName);
+        catalog.dropDatabase(databaseName, ignoreIfNotExists, cascade);
+    }
+
+    /**
+     * Modify an existing database.
+     *
+     * @param catalogName Name of the catalog for database
+     * @param databaseName Name of the database to be dropped
+     * @param newDatabase The new database definition
+     * @param ignoreIfNotExists Flag to specify behavior when the given 
database does not exist: if
+     *     set to false, throw an exception, if set to true, do nothing.
+     * @throws DatabaseNotExistException if the given database does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    public void alterDatabase(
+            String catalogName,
+            String databaseName,
+            CatalogDatabase newDatabase,
+            boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        Catalog catalog = getCatalogOrError(catalogName);
+        catalog.alterDatabase(databaseName, newDatabase, ignoreIfNotExists);
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java
index 28350614281..e089b1124f1 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableResultImpl;
 import org.apache.flink.table.api.internal.TableResultInternal;
-import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.operations.Operation;
@@ -70,9 +69,10 @@ public class AlterDatabaseOperation implements 
AlterOperation {
 
     @Override
     public TableResultInternal execute(Context ctx) {
-        Catalog catalog = 
ctx.getCatalogManager().getCatalogOrThrowException(getCatalogName());
         try {
-            catalog.alterDatabase(getDatabaseName(), getCatalogDatabase(), 
false);
+            ctx.getCatalogManager()
+                    .alterDatabase(
+                            getCatalogName(), getDatabaseName(), 
getCatalogDatabase(), false);
             return TableResultImpl.TABLE_RESULT_OK;
         } catch (DatabaseNotExistException e) {
             throw new ValidationException(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateDatabaseOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateDatabaseOperation.java
index ea938ffa108..7f9cfab77d4 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateDatabaseOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateDatabaseOperation.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableResultImpl;
 import org.apache.flink.table.api.internal.TableResultInternal;
-import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.operations.Operation;
@@ -80,9 +79,9 @@ public class CreateDatabaseOperation implements 
CreateOperation {
 
     @Override
     public TableResultInternal execute(Context ctx) {
-        Catalog catalog = 
ctx.getCatalogManager().getCatalogOrThrowException(catalogName);
         try {
-            catalog.createDatabase(databaseName, catalogDatabase, 
ignoreIfExists);
+            ctx.getCatalogManager()
+                    .createDatabase(catalogName, databaseName, 
catalogDatabase, ignoreIfExists);
             return TableResultImpl.TABLE_RESULT_OK;
         } catch (DatabaseAlreadyExistException e) {
             throw new ValidationException(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropDatabaseOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropDatabaseOperation.java
index 6e93144e00a..509b2288805 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropDatabaseOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropDatabaseOperation.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.internal.TableResultImpl;
 import org.apache.flink.table.api.internal.TableResultInternal;
-import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 
@@ -68,9 +67,9 @@ public class DropDatabaseOperation implements DropOperation {
 
     @Override
     public TableResultInternal execute(Context ctx) {
-        Catalog catalog = 
ctx.getCatalogManager().getCatalogOrThrowException(getCatalogName());
         try {
-            catalog.dropDatabase(getDatabaseName(), isIfExists(), isCascade());
+            ctx.getCatalogManager()
+                    .dropDatabase(getCatalogName(), getDatabaseName(), 
isIfExists(), isCascade());
             return TableResultImpl.TABLE_RESULT_OK;
         } catch (DatabaseNotExistException | DatabaseNotEmptyException e) {
             throw new ValidationException(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 4694b68ba8b..289804fd2c2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -165,11 +165,8 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
     @Test
     public void testAlterDatabase() throws Exception {
         catalogManager.registerCatalog("cat1", new 
GenericInMemoryCatalog("default", "default"));
-        catalogManager
-                .getCatalog("cat1")
-                .get()
-                .createDatabase(
-                        "db1", new CatalogDatabaseImpl(new HashMap<>(), 
"db1_comment"), true);
+        catalogManager.createDatabase(
+                "cat1", "db1", new CatalogDatabaseImpl(new HashMap<>(), 
"db1_comment"), true);
         final String sql = "alter database cat1.db1 set ('k1'='v1', 
'K2'='V2')";
         Operation operation = parse(sql);
         assertThat(operation).isInstanceOf(AlterDatabaseOperation.class);
@@ -2256,7 +2253,8 @@ public class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversion
         if (!catalogManager.getCatalog("cat1").isPresent()) {
             catalogManager.registerCatalog("cat1", catalog);
         }
-        catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(), 
null), true);
+        catalogManager.createDatabase(
+                "cat1", "db1", new CatalogDatabaseImpl(new HashMap<>(), null), 
true);
         Schema.Builder builder =
                 Schema.newBuilder()
                         .column("a", DataTypes.INT().notNull())

Reply via email to