This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d8e77674a88 [FLINK-32403][table] Add database related operations in
CatalogManager (#22869)
d8e77674a88 is described below
commit d8e77674a885feba22dd079656e4b39f33fa5da1
Author: Shammon FY <[email protected]>
AuthorDate: Thu Jul 20 14:46:43 2023 +0800
[FLINK-32403][table] Add database related operations in CatalogManager
(#22869)
---
.../apache/flink/table/catalog/CatalogManager.java | 67 ++++++++++++++++++++++
.../operations/ddl/AlterDatabaseOperation.java | 6 +-
.../operations/ddl/CreateDatabaseOperation.java | 5 +-
.../operations/ddl/DropDatabaseOperation.java | 5 +-
.../operations/SqlDdlToOperationConverterTest.java | 10 ++--
5 files changed, 78 insertions(+), 15 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 2ec64890e93..4be17444b9a 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -28,6 +28,8 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
@@ -1066,4 +1068,69 @@ public final class CatalogManager implements
CatalogRegistry {
final ResolvedSchema resolvedSchema =
view.getUnresolvedSchema().resolve(schemaResolver);
return new ResolvedCatalogView(view, resolvedSchema);
}
+
+ /**
+ * Create a database.
+ *
+ * @param catalogName Name of the catalog for database
+ * @param databaseName Name of the database to be created
+ * @param database The database definition
+ * @param ignoreIfExists Flag to specify behavior when a database with the
given name already
+ * exists: if set to false, throw a DatabaseAlreadyExistException, if
set to true, do
+ * nothing.
+ * @throws DatabaseAlreadyExistException if the given database already
exists and ignoreIfExists
+ * is false
+ * @throws CatalogException in case of any runtime exception
+ */
+ public void createDatabase(
+ String catalogName,
+ String databaseName,
+ CatalogDatabase database,
+ boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ Catalog catalog = getCatalogOrThrowException(catalogName);
+ catalog.createDatabase(databaseName, database, ignoreIfExists);
+ }
+
+ /**
+ * Drop a database.
+ *
+ * @param catalogName Name of the catalog for database.
+ * @param databaseName Name of the database to be dropped.
+ * @param ignoreIfNotExists Flag to specify behavior when the database
does not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @param cascade Flag to specify behavior when the database contains
table or function: if set
+ * to true, delete all tables and functions in the database and then
delete the database, if
+ * set to false, throw an exception.
+ * @throws DatabaseNotExistException if the given database does not exist
+ * @throws DatabaseNotEmptyException if the given database is not empty
and isRestrict is true
+ * @throws CatalogException in case of any runtime exception
+ */
+ public void dropDatabase(
+ String catalogName, String databaseName, boolean
ignoreIfNotExists, boolean cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException,
CatalogException {
+ Catalog catalog = getCatalogOrError(catalogName);
+ catalog.dropDatabase(databaseName, ignoreIfNotExists, cascade);
+ }
+
+ /**
+ * Modify an existing database.
+ *
+ * @param catalogName Name of the catalog for database
+ * @param databaseName Name of the database to be dropped
+ * @param newDatabase The new database definition
+ * @param ignoreIfNotExists Flag to specify behavior when the given
database does not exist: if
+ * set to false, throw an exception, if set to true, do nothing.
+ * @throws DatabaseNotExistException if the given database does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ public void alterDatabase(
+ String catalogName,
+ String databaseName,
+ CatalogDatabase newDatabase,
+ boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ Catalog catalog = getCatalogOrError(catalogName);
+ catalog.alterDatabase(databaseName, newDatabase, ignoreIfNotExists);
+ }
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java
index 28350614281..e089b1124f1 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
-import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.operations.Operation;
@@ -70,9 +69,10 @@ public class AlterDatabaseOperation implements
AlterOperation {
@Override
public TableResultInternal execute(Context ctx) {
- Catalog catalog =
ctx.getCatalogManager().getCatalogOrThrowException(getCatalogName());
try {
- catalog.alterDatabase(getDatabaseName(), getCatalogDatabase(),
false);
+ ctx.getCatalogManager()
+ .alterDatabase(
+ getCatalogName(), getDatabaseName(),
getCatalogDatabase(), false);
return TableResultImpl.TABLE_RESULT_OK;
} catch (DatabaseNotExistException e) {
throw new ValidationException(
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateDatabaseOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateDatabaseOperation.java
index ea938ffa108..7f9cfab77d4 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateDatabaseOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateDatabaseOperation.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
-import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.operations.Operation;
@@ -80,9 +79,9 @@ public class CreateDatabaseOperation implements
CreateOperation {
@Override
public TableResultInternal execute(Context ctx) {
- Catalog catalog =
ctx.getCatalogManager().getCatalogOrThrowException(catalogName);
try {
- catalog.createDatabase(databaseName, catalogDatabase,
ignoreIfExists);
+ ctx.getCatalogManager()
+ .createDatabase(catalogName, databaseName,
catalogDatabase, ignoreIfExists);
return TableResultImpl.TABLE_RESULT_OK;
} catch (DatabaseAlreadyExistException e) {
throw new ValidationException(
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropDatabaseOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropDatabaseOperation.java
index 6e93144e00a..509b2288805 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropDatabaseOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropDatabaseOperation.java
@@ -22,7 +22,6 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
-import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -68,9 +67,9 @@ public class DropDatabaseOperation implements DropOperation {
@Override
public TableResultInternal execute(Context ctx) {
- Catalog catalog =
ctx.getCatalogManager().getCatalogOrThrowException(getCatalogName());
try {
- catalog.dropDatabase(getDatabaseName(), isIfExists(), isCascade());
+ ctx.getCatalogManager()
+ .dropDatabase(getCatalogName(), getDatabaseName(),
isIfExists(), isCascade());
return TableResultImpl.TABLE_RESULT_OK;
} catch (DatabaseNotExistException | DatabaseNotEmptyException e) {
throw new ValidationException(
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 4694b68ba8b..289804fd2c2 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -165,11 +165,8 @@ public class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversion
@Test
public void testAlterDatabase() throws Exception {
catalogManager.registerCatalog("cat1", new
GenericInMemoryCatalog("default", "default"));
- catalogManager
- .getCatalog("cat1")
- .get()
- .createDatabase(
- "db1", new CatalogDatabaseImpl(new HashMap<>(),
"db1_comment"), true);
+ catalogManager.createDatabase(
+ "cat1", "db1", new CatalogDatabaseImpl(new HashMap<>(),
"db1_comment"), true);
final String sql = "alter database cat1.db1 set ('k1'='v1',
'K2'='V2')";
Operation operation = parse(sql);
assertThat(operation).isInstanceOf(AlterDatabaseOperation.class);
@@ -2256,7 +2253,8 @@ public class SqlDdlToOperationConverterTest extends
SqlNodeToOperationConversion
if (!catalogManager.getCatalog("cat1").isPresent()) {
catalogManager.registerCatalog("cat1", catalog);
}
- catalog.createDatabase("db1", new CatalogDatabaseImpl(new HashMap<>(),
null), true);
+ catalogManager.createDatabase(
+ "cat1", "db1", new CatalogDatabaseImpl(new HashMap<>(), null),
true);
Schema.Builder builder =
Schema.newBuilder()
.column("a", DataTypes.INT().notNull())