This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 36103ff3a97fd7de656a795484999418c92f9eb5 Author: Feng Jin <[email protected]> AuthorDate: Tue Sep 10 11:08:04 2024 +0800 [FLINK-36242][table] Drop all materialized tables manually instead of automatically dropping them in the after method. --- .../AbstractMaterializedTableStatementITCase.java | 43 +++++----------- ...GatewayRestEndpointMaterializedTableITCase.java | 10 ++++ .../service/MaterializedTableStatementITCase.java | 57 +++++++++++++++++++++- 3 files changed, 77 insertions(+), 33 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractMaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractMaterializedTableStatementITCase.java index 1a6ee690cbd..7ec2aad15de 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractMaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractMaterializedTableStatementITCase.java @@ -27,12 +27,10 @@ import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogMaterializedTable; -import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.data.RowData; import org.apache.flink.table.gateway.api.operation.OperationHandle; -import org.apache.flink.table.gateway.api.results.TableInfo; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion; @@ -47,7 +45,6 @@ import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.types.Row; import org.apache.flink.util.concurrent.ExecutorThreadFactory; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Order; @@ -57,12 +54,10 @@ import org.junit.jupiter.api.io.TempDir; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -181,32 +176,6 @@ public abstract class AbstractMaterializedTableStatementITCase { restClusterClient = injectClusterClient; } - @AfterEach - void after() throws Exception { - Set<TableInfo> tableInfos = - service.listTables( - sessionHandle, - fileSystemCatalogName, - TEST_DEFAULT_DATABASE, - Collections.singleton(CatalogBaseTable.TableKind.TABLE)); - - // drop all materialized tables - for (TableInfo tableInfo : tableInfos) { - ResolvedCatalogBaseTable<?> resolvedTable = - service.getTable(sessionHandle, tableInfo.getIdentifier()); - if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == resolvedTable.getTableKind()) { - String dropTableDDL = - String.format( - "DROP MATERIALIZED TABLE %s", - tableInfo.getIdentifier().asSerializableString()); - OperationHandle dropTableHandle = - service.executeStatement( - sessionHandle, dropTableDDL, -1, new Configuration()); - awaitOperationTermination(service, sessionHandle, dropTableHandle); - } - } - } - private SessionHandle initializeSession() { SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); String catalogDDL = @@ -364,4 +333,14 @@ public abstract class AbstractMaterializedTableStatementITCase { Duration.ofMillis(pause), "Failed to verify whether the job is finished."); } + + public void dropMaterializedTable(ObjectIdentifier objectIdentifier) throws Exception { + String dropMaterializedTableDDL = + String.format( + "DROP MATERIALIZED TABLE %s", objectIdentifier.asSerializableString()); + OperationHandle dropMaterializedTableHandle = + service.executeStatement( + sessionHandle, dropMaterializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, dropMaterializedTableHandle); + } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointMaterializedTableITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointMaterializedTableITCase.java index e66453cf3d1..9a019d55ae1 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointMaterializedTableITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointMaterializedTableITCase.java @@ -140,6 +140,11 @@ public class SqlGatewayRestEndpointMaterializedTableITCase GenericMapData clusterInfo = ((GenericMapData) results.get(0).getMap(1)); assertThat(clusterInfo.get(StringData.fromString(TARGET.key()))) .isEqualTo(StringData.fromString("remote")); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of( + fileSystemCatalogName, TEST_DEFAULT_DATABASE, "my_materialized_table")); } @Test @@ -214,6 +219,11 @@ public class SqlGatewayRestEndpointMaterializedTableITCase GenericMapData clusterInfo = ((GenericMapData) results.get(0).getMap(1)); assertThat(clusterInfo.get(StringData.fromString(TARGET.key()))) .isEqualTo(StringData.fromString("remote")); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of( + fileSystemCatalogName, TEST_DEFAULT_DATABASE, "my_materialized_table")); } FetchResultsResponseBody fetchResults( diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index 32ccbb95a7a..b5f5fc3bd5f 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -169,6 +169,10 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS long checkpointInterval = getCheckpointIntervalConfig(restClusterClient, activeRefreshHandler.getJobId()); assertThat(checkpointInterval).isEqualTo(30 * 1000); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); } @Test @@ -230,6 +234,10 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS long actualCheckpointInterval = getCheckpointIntervalConfig(restClusterClient, activeRefreshHandler.getJobId()); assertThat(actualCheckpointInterval).isEqualTo(checkpointInterval); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); } @Test @@ -330,10 +338,14 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS .containsKey("table.catalog-store.file.path") .doesNotContainKey(WORKFLOW_SCHEDULER_TYPE.key()) .doesNotContainKey(RESOURCES_DOWNLOAD_DIR.key()); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); } @Test - void testCreateMaterializedTableFailedInInContinuousMode() { + void testCreateMaterializedTableFailedInInContinuousMode() throws Exception { // create a materialized table with invalid SQL String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops" @@ -429,6 +441,11 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS "SELECT * FROM my_materialized_table where ds = '2024-01-02'") .size()) .isEqualTo(1); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of( + fileSystemCatalogName, TEST_DEFAULT_DATABASE, "my_materialized_table")); } @Test @@ -509,6 +526,10 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS "Currently, refreshing materialized table only supports referring to char, varchar and string type partition keys. All specified partition keys in partition specs with unsupported types are:\n" + "\n" + "ds2"); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); } @Test @@ -654,6 +675,10 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS getJobRestoreSavepointPath(restClusterClient, resumeJobId); assertThat(actualRestorePath).isNotEmpty(); assertThat(actualRestorePath.get()).isEqualTo(actualSavepointPath); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); } @Test @@ -714,6 +739,10 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS .isInstanceOf(ValidationException.class) .hasMessageContaining( "Savepoint directory is not configured, can't stop job with savepoint."); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); } @Test @@ -921,6 +950,10 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS fromJson((String) jobDetail.getJobDataMap().get(WORKFLOW_INFO), WorkflowInfo.class); assertThat(workflowInfo.getDynamicOptions()) .containsEntry("debezium-json.ignore-parse-errors", "true"); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); } @Test @@ -1324,6 +1357,11 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS "SELECT * FROM my_materialized_table where ds = '2024-01-01'") .size()) .isNotEqualTo(getPartitionSize(data, "2024-01-01")); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of( + fileSystemCatalogName, TEST_DEFAULT_DATABASE, "my_materialized_table")); } @Test @@ -1387,6 +1425,13 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS "SELECT * FROM my_materialized_table_without_partition_options where ds = '2024-01-02'") .size()) .isEqualTo(getPartitionSize(data, "2024-01-02")); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "my_materialized_table_without_partition_options")); } @Test @@ -1448,6 +1493,11 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS "SELECT * FROM my_materialized_table where ds = '2024-01-01'") .size()) .isNotEqualTo(getPartitionSize(data, "2024-01-01")); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of( + fileSystemCatalogName, TEST_DEFAULT_DATABASE, "my_materialized_table")); } @Test @@ -1517,6 +1567,11 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS TEST_DEFAULT_DATABASE, "my_materialized_table") .asSerializableString())); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of( + fileSystemCatalogName, TEST_DEFAULT_DATABASE, "my_materialized_table")); } private int getPartitionSize(List<Row> data, String partition) {
