This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 36103ff3a97fd7de656a795484999418c92f9eb5
Author: Feng Jin <[email protected]>
AuthorDate: Tue Sep 10 11:08:04 2024 +0800

    [FLINK-36242][table] Drop all materialized tables manually instead of 
automatically dropping them in the after method.
---
 .../AbstractMaterializedTableStatementITCase.java  | 43 +++++-----------
 ...GatewayRestEndpointMaterializedTableITCase.java | 10 ++++
 .../service/MaterializedTableStatementITCase.java  | 57 +++++++++++++++++++++-
 3 files changed, 77 insertions(+), 33 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractMaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractMaterializedTableStatementITCase.java
index 1a6ee690cbd..7ec2aad15de 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractMaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractMaterializedTableStatementITCase.java
@@ -27,12 +27,10 @@ import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
-import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
-import org.apache.flink.table.gateway.api.results.TableInfo;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
@@ -47,7 +45,6 @@ import 
org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Order;
@@ -57,12 +54,10 @@ import org.junit.jupiter.api.io.TempDir;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Duration;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -181,32 +176,6 @@ public abstract class 
AbstractMaterializedTableStatementITCase {
         restClusterClient = injectClusterClient;
     }
 
-    @AfterEach
-    void after() throws Exception {
-        Set<TableInfo> tableInfos =
-                service.listTables(
-                        sessionHandle,
-                        fileSystemCatalogName,
-                        TEST_DEFAULT_DATABASE,
-                        
Collections.singleton(CatalogBaseTable.TableKind.TABLE));
-
-        // drop all materialized tables
-        for (TableInfo tableInfo : tableInfos) {
-            ResolvedCatalogBaseTable<?> resolvedTable =
-                    service.getTable(sessionHandle, tableInfo.getIdentifier());
-            if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == 
resolvedTable.getTableKind()) {
-                String dropTableDDL =
-                        String.format(
-                                "DROP MATERIALIZED TABLE %s",
-                                
tableInfo.getIdentifier().asSerializableString());
-                OperationHandle dropTableHandle =
-                        service.executeStatement(
-                                sessionHandle, dropTableDDL, -1, new 
Configuration());
-                awaitOperationTermination(service, sessionHandle, 
dropTableHandle);
-            }
-        }
-    }
-
     private SessionHandle initializeSession() {
         SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
         String catalogDDL =
@@ -364,4 +333,14 @@ public abstract class 
AbstractMaterializedTableStatementITCase {
                 Duration.ofMillis(pause),
                 "Failed to verify whether the job is finished.");
     }
+
+    public void dropMaterializedTable(ObjectIdentifier objectIdentifier) 
throws Exception {
+        String dropMaterializedTableDDL =
+                String.format(
+                        "DROP MATERIALIZED TABLE %s", 
objectIdentifier.asSerializableString());
+        OperationHandle dropMaterializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, dropMaterializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
dropMaterializedTableHandle);
+    }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointMaterializedTableITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointMaterializedTableITCase.java
index e66453cf3d1..9a019d55ae1 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointMaterializedTableITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointMaterializedTableITCase.java
@@ -140,6 +140,11 @@ public class SqlGatewayRestEndpointMaterializedTableITCase
         GenericMapData clusterInfo = ((GenericMapData) 
results.get(0).getMap(1));
         assertThat(clusterInfo.get(StringData.fromString(TARGET.key())))
                 .isEqualTo(StringData.fromString("remote"));
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(
+                        fileSystemCatalogName, TEST_DEFAULT_DATABASE, 
"my_materialized_table"));
     }
 
     @Test
@@ -214,6 +219,11 @@ public class SqlGatewayRestEndpointMaterializedTableITCase
         GenericMapData clusterInfo = ((GenericMapData) 
results.get(0).getMap(1));
         assertThat(clusterInfo.get(StringData.fromString(TARGET.key())))
                 .isEqualTo(StringData.fromString("remote"));
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(
+                        fileSystemCatalogName, TEST_DEFAULT_DATABASE, 
"my_materialized_table"));
     }
 
     FetchResultsResponseBody fetchResults(
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 32ccbb95a7a..b5f5fc3bd5f 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -169,6 +169,10 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
         long checkpointInterval =
                 getCheckpointIntervalConfig(restClusterClient, 
activeRefreshHandler.getJobId());
         assertThat(checkpointInterval).isEqualTo(30 * 1000);
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
     }
 
     @Test
@@ -230,6 +234,10 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
         long actualCheckpointInterval =
                 getCheckpointIntervalConfig(restClusterClient, 
activeRefreshHandler.getJobId());
         assertThat(actualCheckpointInterval).isEqualTo(checkpointInterval);
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
     }
 
     @Test
@@ -330,10 +338,14 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                 .containsKey("table.catalog-store.file.path")
                 .doesNotContainKey(WORKFLOW_SCHEDULER_TYPE.key())
                 .doesNotContainKey(RESOURCES_DOWNLOAD_DIR.key());
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
     }
 
     @Test
-    void testCreateMaterializedTableFailedInInContinuousMode() {
+    void testCreateMaterializedTableFailedInInContinuousMode() throws 
Exception {
         // create a materialized table with invalid SQL
         String materializedTableDDL =
                 "CREATE MATERIALIZED TABLE users_shops"
@@ -429,6 +441,11 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                                         "SELECT * FROM my_materialized_table 
where ds = '2024-01-02'")
                                 .size())
                 .isEqualTo(1);
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(
+                        fileSystemCatalogName, TEST_DEFAULT_DATABASE, 
"my_materialized_table"));
     }
 
     @Test
@@ -509,6 +526,10 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                         "Currently, refreshing materialized table only 
supports referring to char, varchar and string type partition keys. All 
specified partition keys in partition specs with unsupported types are:\n"
                                 + "\n"
                                 + "ds2");
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
     }
 
     @Test
@@ -654,6 +675,10 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                 getJobRestoreSavepointPath(restClusterClient, resumeJobId);
         assertThat(actualRestorePath).isNotEmpty();
         assertThat(actualRestorePath.get()).isEqualTo(actualSavepointPath);
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
     }
 
     @Test
@@ -714,6 +739,10 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                 .isInstanceOf(ValidationException.class)
                 .hasMessageContaining(
                         "Savepoint directory is not configured, can't stop job 
with savepoint.");
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
     }
 
     @Test
@@ -921,6 +950,10 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                 fromJson((String) 
jobDetail.getJobDataMap().get(WORKFLOW_INFO), WorkflowInfo.class);
         assertThat(workflowInfo.getDynamicOptions())
                 .containsEntry("debezium-json.ignore-parse-errors", "true");
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
     }
 
     @Test
@@ -1324,6 +1357,11 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                                         "SELECT * FROM my_materialized_table 
where ds = '2024-01-01'")
                                 .size())
                 .isNotEqualTo(getPartitionSize(data, "2024-01-01"));
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(
+                        fileSystemCatalogName, TEST_DEFAULT_DATABASE, 
"my_materialized_table"));
     }
 
     @Test
@@ -1387,6 +1425,13 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                                         "SELECT * FROM 
my_materialized_table_without_partition_options where ds = '2024-01-02'")
                                 .size())
                 .isEqualTo(getPartitionSize(data, "2024-01-02"));
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(
+                        fileSystemCatalogName,
+                        TEST_DEFAULT_DATABASE,
+                        "my_materialized_table_without_partition_options"));
     }
 
     @Test
@@ -1448,6 +1493,11 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                                         "SELECT * FROM my_materialized_table 
where ds = '2024-01-01'")
                                 .size())
                 .isNotEqualTo(getPartitionSize(data, "2024-01-01"));
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(
+                        fileSystemCatalogName, TEST_DEFAULT_DATABASE, 
"my_materialized_table"));
     }
 
     @Test
@@ -1517,6 +1567,11 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                                                 TEST_DEFAULT_DATABASE,
                                                 "my_materialized_table")
                                         .asSerializableString()));
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(
+                        fileSystemCatalogName, TEST_DEFAULT_DATABASE, 
"my_materialized_table"));
     }
 
     private int getPartitionSize(List<Row> data, String partition) {

Reply via email to