This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new cc17d700dd6 [FLINK-29677][table] Prevent dropping the current catalog
(#21134)
cc17d700dd6 is described below
commit cc17d700dd6ec1708ed6b32e6051f0b00371669f
Author: Jane Chan <[email protected]>
AuthorDate: Mon Oct 24 14:58:53 2022 +0800
[FLINK-29677][table] Prevent dropping the current catalog (#21134)
---
.../src/test/resources/sql/catalog_database.q | 14 +++
.../src/test/resources/sql/catalog_database.q | 107 +++++++++++++++++----
.../apache/flink/table/catalog/CatalogManager.java | 3 +
.../flink/table/planner/catalog/CatalogITCase.java | 19 +++-
.../flink/table/api/TableEnvironmentTest.scala | 6 ++
5 files changed, 125 insertions(+), 24 deletions(-)
diff --git
a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
index a2fbae0d300..3704ce2f52c 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q
@@ -88,6 +88,11 @@ drop catalog default_catalog;
[INFO] Execute statement succeed.
!info
+drop catalog c1;
+[ERROR] Could not execute SQL statement. Reason:
+org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a
catalog which is currently in use.
+!error
+
# ==========================================================================
# test database
# ==========================================================================
@@ -183,6 +188,15 @@ drop database `default`;
[INFO] Execute statement succeed.
!info
+drop catalog `mod`;
+[ERROR] Could not execute SQL statement. Reason:
+org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a
catalog which is currently in use.
+!error
+
+use catalog `c1`;
+[INFO] Execute statement succeed.
+!info
+
drop catalog `mod`;
[INFO] Execute statement succeed.
!info
diff --git
a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
index fe96c4e1918..d79da350830 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q
@@ -262,6 +262,21 @@ drop database `default`;
1 row in set
!ok
+drop catalog `mod`;
+!output
+org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a
catalog which is currently in use.
+!error
+
+use catalog `c1`;
+!output
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
+
drop catalog `mod`;
!output
+--------+
@@ -283,39 +298,71 @@
org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name
create table MyTable1 (a int, b string) with ('connector' = 'values');
!output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist.
-!error
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
create table MyTable2 (a int, b string) with ('connector' = 'values');
!output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist.
-!error
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
# hive catalog is case-insensitive
show tables;
!output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist
-!error
++------------+
+| table name |
++------------+
+| MyTable1 |
+| MyTable2 |
++------------+
+2 rows in set
+!ok
show views;
!output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist
-!error
+Empty set
+!ok
create view MyView1 as select 1 + 1;
!output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist.
-!error
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
create view MyView2 as select 1 + 1;
!output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist.
-!error
++--------+
+| result |
++--------+
+| OK |
++--------+
+1 row in set
+!ok
show views;
!output
-org.apache.flink.table.api.ValidationException: Catalog mod does not exist
-!error
++-----------+
+| view name |
++-----------+
+| MyView1 |
+| MyView2 |
++-----------+
+2 rows in set
+!ok
# test create with full qualified name
create table c1.db1.MyTable3 (a int, b string) with ('connector' = 'values');
@@ -458,12 +505,16 @@ show tables;
+------------+
| table name |
+------------+
+| MyTable1 |
+| MyTable2 |
| MyTable5 |
| MyTable6 |
+| MyView1 |
+| MyView2 |
| MyView5 |
| MyView6 |
+------------+
-4 rows in set
+8 rows in set
!ok
show views;
@@ -471,10 +522,12 @@ show views;
+-----------+
| view name |
+-----------+
+| MyView1 |
+| MyView2 |
| MyView5 |
| MyView6 |
+-----------+
-2 rows in set
+4 rows in set
!ok
drop table db1.MyTable3;
@@ -563,10 +616,14 @@ show tables;
+------------+
| table name |
+------------+
+| MyTable1 |
+| MyTable2 |
| MyTable5 |
+| MyView1 |
+| MyView2 |
| MyView5 |
+------------+
-2 rows in set
+6 rows in set
!ok
show views;
@@ -574,9 +631,11 @@ show views;
+-----------+
| view name |
+-----------+
+| MyView1 |
+| MyView2 |
| MyView5 |
+-----------+
-1 row in set
+3 rows in set
!ok
# ==========================================================================
@@ -598,11 +657,15 @@ show tables;
+------------+
| table name |
+------------+
+| MyTable1 |
+| MyTable2 |
| MyTable5 |
| MyTable7 |
+| MyView1 |
+| MyView2 |
| MyView5 |
+------------+
-3 rows in set
+7 rows in set
!ok
reset;
@@ -630,10 +693,14 @@ show tables;
+------------+
| table name |
+------------+
+| MyTable1 |
+| MyTable2 |
| MyTable7 |
+| MyView1 |
+| MyView2 |
| MyView5 |
+------------+
-2 rows in set
+6 rows in set
!ok
# ==========================================================================
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index ade4c3b155e..66b6dc2b4d8 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -226,6 +226,9 @@ public final class CatalogManager {
"Catalog name cannot be null or empty.");
if (catalogs.containsKey(catalogName)) {
+ if (currentCatalogName.equals(catalogName)) {
+ throw new CatalogException("Cannot drop a catalog which is
currently in use.");
+ }
Catalog catalog = catalogs.remove(catalogName);
catalog.close();
} else if (!ignoreIfNotExists) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
index 119aa4b51fc..5e95788fc95 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
@@ -22,6 +22,7 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
@@ -30,6 +31,7 @@ import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.TemporaryClassLoaderContext;
@@ -43,6 +45,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT Case for catalog ddl. */
public class CatalogITCase {
@@ -69,15 +72,23 @@ public class CatalogITCase {
String name = "c1";
TableEnvironment tableEnv = getTableEnvironment();
- String ddl =
+ String createDdl =
String.format(
"create catalog %s with('type'='%s')",
name, GenericInMemoryCatalogFactoryOptions.IDENTIFIER);
- tableEnv.executeSql(ddl);
+ tableEnv.executeSql(createDdl);
assertThat(tableEnv.getCatalog(name)).isPresent();
- ddl = String.format("drop catalog %s", name);
- tableEnv.executeSql(ddl);
+ String dropDdl = String.format("drop catalog %s", name);
+ tableEnv.executeSql(String.format("use catalog %s", name));
+ assertThatThrownBy(() -> tableEnv.executeSql(dropDdl))
+ .isInstanceOf(ValidationException.class)
+ .hasRootCauseExactlyInstanceOf(CatalogException.class)
+ .hasRootCauseMessage("Cannot drop a catalog which is currently
in use.");
+ assertThat(tableEnv.getCatalog(name)).isPresent();
+
+ tableEnv.executeSql("use catalog default_catalog");
+ tableEnv.executeSql(dropDdl);
assertThat(tableEnv.getCatalog(name)).isNotPresent();
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
index 791a365831d..e10ad0c93f1 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
@@ -770,6 +770,12 @@ class TableEnvironmentTest {
assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)
assertEquals("my_catalog", tableEnv.getCurrentCatalog)
+ assertThatThrownBy(() => tableEnv.executeSql("DROP CATALOG my_catalog"))
+ .isInstanceOf(classOf[ValidationException])
+ .hasRootCauseMessage("Cannot drop a catalog which is currently in use.")
+
+ tableEnv.executeSql("USE CATALOG default_catalog")
+
val tableResult3 = tableEnv.executeSql("DROP CATALOG my_catalog")
assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind)
assertFalse(tableEnv.getCatalog("my_catalog").isPresent)