bowenli86 commented on a change in pull request #10296: [FLINK-14691][table]Add
use/create/drop/alter database operation and support it in flink/blink planner
URL: https://github.com/apache/flink/pull/10296#discussion_r350379505
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -455,6 +457,93 @@ public void createTable(CatalogBaseTable table,
ObjectIdentifier objectIdentifie
"CreateTable");
}
+ /**
+ * Creates a database in a given fully qualified path.
+ *
+ * @param catalogName The catalog where database will be created.S
+ * @param databaseName Name of database to be created.
+ * @param database The database to be created.
+ * @param ignoreIfExists If false exception will be thrown if a
database exists in the given path.
+ */
+ public void createDatabase(String catalogName,
+ String databaseName,
+ CatalogDatabase database,
+ boolean ignoreIfExists,
+ boolean ignoreNoCatalog) {
+ Optional<Catalog> catalog = getCatalog(catalogName);
+ if (catalog.isPresent()) {
+ try {
+ catalog.get().createDatabase(databaseName,
database, ignoreIfExists);
+ } catch (DatabaseAlreadyExistException e) {
+ throw new ValidationException(
+ String.format("Could not
execute %s in path %s", "CREATE DATABASE", catalogName), e);
+ } catch (Exception e) {
+ throw new TableException(
+ String.format("Could not
execute %s in path %s", "CREATE DATABASE", catalogName), e);
+ }
+ } else if (!ignoreNoCatalog) {
+ throw new ValidationException(String.format("Catalog %s
does not exist.", catalogName));
+ }
+ }
+
+ /**
+ * Drop a database in a given path.
+ *
+ * @param catalogName The catalog where database will be deleted.
+ * @param databaseName Name of database to be deleted.
+ * @param ignoreIfNotExists If false exception will be thrown if a
database not exists in the given path.
+ * @param isRestrict Flag to specify behavior when the database
contains table:
+ * if set to false, delete all tables in
the database and then delete the database,
+ * if set to true, throw an exception.
+ */
+ public void dropDatabase(String catalogName,
+ String databaseName,
+ boolean ignoreIfNotExists,
+ boolean isRestrict,
+ boolean ignoreNoCatalog) {
+ Optional<Catalog> catalog = getCatalog(catalogName);
+ if (catalog.isPresent()) {
+ try {
+ catalog.get().dropDatabase(databaseName,
ignoreIfNotExists, isRestrict);
+ } catch (DatabaseNotExistException |
DatabaseNotEmptyException e) {
+ throw new ValidationException(
+ String.format("Could not
execute %s in path %s", "DROP DATABASE", catalogName), e);
+ } catch (Exception e) {
+ throw new TableException(
+ String.format("Could not
execute %s in path %s", "DROP DATABASE", catalogName), e);
+ }
+ } else if (!ignoreNoCatalog) {
+ throw new ValidationException(String.format("Catalog %s
does not exist.", catalogName));
+ }
+ }
+
+ /**
+ * Alter a database in a given path.
+ *
+ * @param catalogName The catalog where database will be deleted.
+ * @param databaseName Name of database to be deleted.
+ * @param catalogDatabase New catalogDatabase.
+ */
+ public void alterDatabase(String catalogName,
Review comment:
ditto
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services