This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 51b744bca1bdf53385152ed237f2950525046488 Author: Feng Jin <jinfeng1...@gmail.com> AuthorDate: Mon May 13 20:08:38 2024 +0800 [FLINK-35193][table] Support execution of drop materialized table --- .../MaterializedTableManager.java | 115 +++++++++- .../service/operation/OperationExecutor.java | 9 + .../service/MaterializedTableStatementITCase.java | 241 ++++++++++++++++++--- .../apache/flink/table/catalog/CatalogManager.java | 4 +- 4 files changed, 328 insertions(+), 41 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index b4ba12b8755..a51b1885c98 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.materializedtable; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogMaterializedTable; @@ -34,6 +35,7 @@ import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.service.operation.OperationExecutor; import org.apache.flink.table.gateway.service.result.ResultFetcher; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.table.operations.command.DescribeJobOperation; import org.apache.flink.table.operations.command.StopJobOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; @@ -93,6 +95,9 @@ public class MaterializedTableManager { } else if (op instanceof AlterMaterializedTableResumeOperation) { return callAlterMaterializedTableResume( operationExecutor, handle, (AlterMaterializedTableResumeOperation) op); + } else if (op instanceof DropMaterializedTableOperation) { + return callDropMaterializedTableOperation( + operationExecutor, handle, (DropMaterializedTableOperation) op); } throw new SqlExecutionException( @@ -146,8 +151,7 @@ public class MaterializedTableManager { materializedTableIdentifier, e); operationExecutor.callExecutableOperation( - handle, - new DropMaterializedTableOperation(materializedTableIdentifier, true, false)); + handle, new DropMaterializedTableOperation(materializedTableIdentifier, true)); throw e; } } @@ -170,7 +174,8 @@ public class MaterializedTableManager { materializedTable.getSerializedRefreshHandler(), operationExecutor.getSessionContext().getUserClassloader()); - String savepointPath = stopJobWithSavepoint(operationExecutor, handle, refreshHandler); + String savepointPath = + stopJobWithSavepoint(operationExecutor, handle, refreshHandler.getJobId()); ContinuousRefreshHandler updateRefreshHandler = new ContinuousRefreshHandler( @@ -183,9 +188,12 @@ public class MaterializedTableManager { CatalogMaterializedTable.RefreshStatus.SUSPENDED, materializedTable.getRefreshHandlerDescription().orElse(null), serializeContinuousHandler(updateRefreshHandler)); + List<TableChange> tableChanges = new ArrayList<>(); + tableChanges.add( + TableChange.modifyRefreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED)); AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = new AlterMaterializedTableChangeOperation( - tableIdentifier, Collections.emptyList(), updatedMaterializedTable); + tableIdentifier, tableChanges, updatedMaterializedTable); operationExecutor.callExecutableOperation(handle, alterMaterializedTableChangeOperation); @@ -284,8 +292,7 @@ public class MaterializedTableManager { // drop materialized table while submit flink streaming job occur exception. Thus, weak // atomicity is guaranteed operationExecutor.callExecutableOperation( - handle, - new DropMaterializedTableOperation(materializedTableIdentifier, true, false)); + handle, new DropMaterializedTableOperation(materializedTableIdentifier, true)); // log and throw exception LOG.error( "Submit continuous refresh job for materialized table {} occur exception.", @@ -414,10 +421,100 @@ public class MaterializedTableManager { return insertStatement.toString(); } - private static String stopJobWithSavepoint( - OperationExecutor executor, + private static ResultFetcher callDropMaterializedTableOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + DropMaterializedTableOperation dropMaterializedTableOperation) { + ObjectIdentifier tableIdentifier = dropMaterializedTableOperation.getTableIdentifier(); + boolean tableExists = operationExecutor.tableExists(tableIdentifier); + if (!tableExists) { + if (dropMaterializedTableOperation.isIfExists()) { + LOG.info( + "Materialized table {} does not exists, skip the drop operation.", + tableIdentifier); + return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); + } else { + throw new ValidationException( + String.format( + "Materialized table with identifier %s does not exist.", + tableIdentifier)); + } + } + + CatalogMaterializedTable materializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshStatus.ACTIVATED + == materializedTable.getRefreshStatus()) { + ContinuousRefreshHandler refreshHandler = + deserializeContinuousHandler( + materializedTable.getSerializedRefreshHandler(), + operationExecutor.getSessionContext().getUserClassloader()); + // get job running status + JobStatus jobStatus = getJobStatus(operationExecutor, handle, refreshHandler); + if (!jobStatus.isTerminalState()) { + try { + cancelJob(operationExecutor, handle, refreshHandler.getJobId()); + } catch (Exception e) { + jobStatus = getJobStatus(operationExecutor, handle, refreshHandler); + if (!jobStatus.isTerminalState()) { + throw new SqlExecutionException( + String.format( + "Failed to drop the materialized table %s because the continuous refresh job %s could not be canceled." + + " The current status of the continuous refresh job is %s.", + tableIdentifier, refreshHandler.getJobId(), jobStatus), + e); + } else { + LOG.warn( + "An exception occurred while canceling the continuous refresh job {} for materialized table {}," + + " but since the job is in a terminal state, skip the cancel operation.", + refreshHandler.getJobId(), + tableIdentifier); + } + } + } else { + LOG.info( + "No need to cancel the continuous refresh job {} for materialized table {} as it is not currently running.", + refreshHandler.getJobId(), + tableIdentifier); + } + } else if (CatalogMaterializedTable.RefreshStatus.INITIALIZING + == materializedTable.getRefreshStatus()) { + throw new ValidationException( + String.format( + "Current refresh status of materialized table %s is initializing, skip the drop operation.", + tableIdentifier.asSerializableString())); + } + + operationExecutor.callExecutableOperation(handle, dropMaterializedTableOperation); + + return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); + } + + private static JobStatus getJobStatus( + OperationExecutor operationExecutor, OperationHandle handle, ContinuousRefreshHandler refreshHandler) { + ResultFetcher resultFetcher = + operationExecutor.callDescribeJobOperation( + operationExecutor.getTableEnvironment(), + handle, + new DescribeJobOperation(refreshHandler.getJobId())); + List<RowData> result = fetchAllResults(resultFetcher); + String jobStatus = result.get(0).getString(2).toString(); + return JobStatus.valueOf(jobStatus); + } + + private static void cancelJob( + OperationExecutor operationExecutor, OperationHandle handle, String jobId) { + operationExecutor.callStopJobOperation( + operationExecutor.getTableEnvironment(), + handle, + new StopJobOperation(jobId, false, false)); + } + + private static String stopJobWithSavepoint( + OperationExecutor executor, OperationHandle handle, String jobId) { // check savepoint dir is configured Optional<String> savepointDir = executor.getSessionContext().getSessionConf().getOptional(SAVEPOINT_DIRECTORY); @@ -429,7 +526,7 @@ public class MaterializedTableManager { executor.callStopJobOperation( executor.getTableEnvironment(), handle, - new StopJobOperation(refreshHandler.getJobId(), true, false)); + new StopJobOperation(jobId, true, false)); List<RowData> results = fetchAllResults(resultFetcher); return results.get(0).getString(0).toString(); } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java index ddd0f930155..a273114a401 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java @@ -281,6 +281,15 @@ public class OperationExecutor { } } + public boolean tableExists(ObjectIdentifier tableIdentifier) { + return sessionContext + .getSessionState() + .catalogManager + .getCatalog(tableIdentifier.getCatalogName()) + .map(catalog -> catalog.tableExists(tableIdentifier.toObjectPath())) + .orElse(false); + } + public ResolvedCatalogBaseTable<?> getTable(ObjectIdentifier tableIdentifier) { return getTableEnvironment() .getCatalogManager() diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index 6eb9bbf89fd..105c51ea597 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -39,16 +39,20 @@ import org.apache.flink.runtime.rest.util.RestMapperUtils; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.TableInfo; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; @@ -74,10 +78,12 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -87,6 +93,7 @@ import static org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_ import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination; import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults; import static org.apache.flink.test.util.TestUtils.waitUntilAllTasksAreRunning; +import static org.apache.flink.test.util.TestUtils.waitUntilJobCanceled; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -134,6 +141,8 @@ public class MaterializedTableStatementITCase { private String fileSystemCatalogPath; private String fileSystemCatalogName; + private SessionHandle sessionHandle; + @BeforeAll static void setUp(@TempDir Path temporaryFolder) throws Exception { service = (SqlGatewayServiceImpl) SQL_GATEWAY_SERVICE_EXTENSION.getService(); @@ -167,17 +176,33 @@ public class MaterializedTableStatementITCase { fileSystemCatalogPath = fileCatalogPath.toString(); fileSystemCatalogName = TEST_CATALOG_PREFIX + randomStr; + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + sessionHandle = initializeSession(); } @AfterEach - void after(@InjectClusterClient RestClusterClient<?> restClusterClient) throws Exception { - // cancel all running jobs for releasing mini cluster resources - for (JobStatusMessage j : restClusterClient.listJobs().get()) { - // cancel all continuous refresh jobs - if (j.getJobName().endsWith("continuous_refresh_job") - && (j.getJobState() == JobStatus.RUNNING - || j.getJobState() == JobStatus.CREATED)) { - restClusterClient.cancel(j.getJobId()).get(); + void after() throws Exception { + Set<TableInfo> tableInfos = + service.listTables( + sessionHandle, + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + Collections.singleton(CatalogBaseTable.TableKind.TABLE)); + + // drop all materialized tables + for (TableInfo tableInfo : tableInfos) { + ResolvedCatalogBaseTable<?> resolvedTable = + service.getTable(sessionHandle, tableInfo.getIdentifier()); + if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == resolvedTable.getTableKind()) { + String dropTableDDL = + String.format( + "DROP MATERIALIZED TABLE %s", + tableInfo.getIdentifier().asSerializableString()); + OperationHandle dropTableHandle = + service.executeStatement( + sessionHandle, dropTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, dropTableHandle); } } } @@ -185,10 +210,6 @@ public class MaterializedTableStatementITCase { @Test void testCreateMaterializedTableInContinuousMode( @InjectClusterClient RestClusterClient<?> restClusterClient) throws Exception { - // initialize session handle, create test-filesystem catalog and register it to catalog - // store - SessionHandle sessionHandle = initializeSession(); - String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops" + " PARTITIONED BY (ds)\n" @@ -267,10 +288,6 @@ public class MaterializedTableStatementITCase { @Test void testCreateMaterializedTableInFullMode() { - // initialize session handle, create test-filesystem catalog and register it to catalog - // store - SessionHandle sessionHandle = initializeSession(); - String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops" + " PARTITIONED BY (ds)\n" @@ -302,14 +319,62 @@ public class MaterializedTableStatementITCase { "Only support create materialized table in continuous refresh mode currently."); } + @Test + void testCreateMaterializedTableFailed() throws Exception { + // create a materialized table with invalid SQL + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + + assertThatThrownBy( + () -> + awaitOperationTermination( + service, sessionHandle, materializedTableHandle)) + .cause() + .hasMessageContaining( + String.format( + "Submit continuous refresh job for materialized table %s occur exception.", + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops") + .asSerializableString())); + + // verify the materialized table is not created + assertThatThrownBy( + () -> + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops"))) + .isInstanceOf(SqlGatewayException.class) + .hasMessageContaining("Failed to getTable."); + } + @Test void testAlterMaterializedTableRefresh( @InjectClusterClient RestClusterClient<?> restClusterClient) throws Exception { long timeout = Duration.ofSeconds(20).toMillis(); long pause = Duration.ofSeconds(2).toMillis(); - // initialize session handle, create test-filesystem catalog and register it to catalog - // store - SessionHandle sessionHandle = initializeSession(); List<Row> data = new ArrayList<>(); data.add(Row.of(1L, 1L, 1L, "2024-01-01")); @@ -434,10 +499,6 @@ public class MaterializedTableStatementITCase { @Test void testAlterMaterializedTableRefreshWithInvalidPartitionSpec() throws Exception { - // initialize session handle, create test-filesystem catalog and register it to catalog - // store - SessionHandle sessionHandle = initializeSession(); - String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops" + " PARTITIONED BY (ds1, ds2)\n" @@ -521,10 +582,6 @@ public class MaterializedTableStatementITCase { @TempDir Path temporaryPath, @InjectClusterClient RestClusterClient<?> restClusterClient) throws Exception { - // initialize session handle, create test-filesystem catalog and register it to catalog - // store - SessionHandle sessionHandle = initializeSession(); - String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops" + " PARTITIONED BY (ds)\n" @@ -659,10 +716,6 @@ public class MaterializedTableStatementITCase { @Test void testAlterMaterializedTableWithoutSavepointDirConfigured( @InjectClusterClient RestClusterClient<?> restClusterClient) throws Exception { - // initialize session handle, create test-filesystem catalog and register it to catalog - // store - SessionHandle sessionHandle = initializeSession(); - String materializedTableDDL = "CREATE MATERIALIZED TABLE users_shops" + " PARTITIONED BY (ds)\n" @@ -720,6 +773,132 @@ public class MaterializedTableStatementITCase { "Savepoint directory is not configured, can't stop job with savepoint."); } + @Test + void testDropMaterializedTable(@InjectClusterClient RestClusterClient<?> restClusterClient) + throws Exception { + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // verify materialized table exists + ResolvedCatalogBaseTable<?> activeMaterializedTable = + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); + + assertThat(activeMaterializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class); + + // verify background job is running + ContinuousRefreshHandler activeRefreshHandler = + ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( + ((ResolvedCatalogMaterializedTable) activeMaterializedTable) + .getSerializedRefreshHandler(), + getClass().getClassLoader()); + String describeJobDDL = String.format("DESCRIBE JOB '%s'", activeRefreshHandler.getJobId()); + OperationHandle describeJobHandle = + service.executeStatement(sessionHandle, describeJobDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, describeJobHandle); + List<RowData> jobResults = fetchAllResults(service, sessionHandle, describeJobHandle); + assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING"); + + // drop materialized table + String dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS users_shops"; + OperationHandle dropMaterializedTableHandle = + service.executeStatement( + sessionHandle, dropMaterializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, dropMaterializedTableHandle); + + // verify materialized table metadata is removed + assertThatThrownBy( + () -> + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops"))) + .isInstanceOf(SqlGatewayException.class) + .hasMessageContaining("Failed to getTable."); + + // verify background job is canceled + waitUntilJobCanceled( + JobID.fromHexString(activeRefreshHandler.getJobId()), restClusterClient); + + String describeJobAfterDropDDL = + String.format("DESCRIBE JOB '%s'", activeRefreshHandler.getJobId()); + OperationHandle describeJobAfterDropHandle = + service.executeStatement( + sessionHandle, describeJobAfterDropDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, describeJobAfterDropHandle); + List<RowData> jobResultsAfterDrop = + fetchAllResults(service, sessionHandle, describeJobAfterDropHandle); + assertThat(jobResultsAfterDrop.get(0).getString(2).toString()).isEqualTo("CANCELED"); + + // verify drop materialized table that doesn't exist + String dropNonExistMaterializedTableDDL = "DROP MATERIALIZED TABLE users_shops"; + OperationHandle dropNonExistTableHandle = + service.executeStatement( + sessionHandle, dropNonExistMaterializedTableDDL, -1, new Configuration()); + + assertThatThrownBy( + () -> + awaitOperationTermination( + service, sessionHandle, dropNonExistTableHandle)) + .rootCause() + .isInstanceOf(ValidationException.class) + .hasMessage( + String.format( + "Materialized table with identifier %s does not exist.", + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops") + .asSerializableString())); + + String dropNonExistMaterializedTableDDL2 = "DROP MATERIALIZED TABLE IF EXISTS users_shops"; + OperationHandle dropNonExistMaterializedTableHandle2 = + service.executeStatement( + sessionHandle, dropNonExistMaterializedTableDDL2, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, dropNonExistMaterializedTableHandle2); + + // Drop a table using drop materialized table statement + dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS datagenSource"; + OperationHandle dropTableHandle = + service.executeStatement( + sessionHandle, dropMaterializedTableDDL, -1, new Configuration()); + assertThatThrownBy(() -> awaitOperationTermination(service, sessionHandle, dropTableHandle)) + .rootCause() + .isInstanceOf(ValidationException.class) + .hasMessage( + String.format( + "Table %s is not a materialized table, does not support materialized table related operation.", + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "datagenSource") + .asSerializableString())); + } + private SessionHandle initializeSession() { SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); String catalogDDL = diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 2f92cb2e528..5e8826e6c3a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -1268,7 +1268,9 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists, boolean isDropTable) { Predicate<CatalogBaseTable> filter = isDropTable - ? table -> table instanceof CatalogTable + ? table -> + table instanceof CatalogTable + || table instanceof CatalogMaterializedTable : table -> table instanceof CatalogView; // Same name temporary table or view exists. if (filter.test(temporaryTables.get(objectIdentifier))) {