This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 51b744bca1bdf53385152ed237f2950525046488
Author: Feng Jin <jinfeng1...@gmail.com>
AuthorDate: Mon May 13 20:08:38 2024 +0800

    [FLINK-35193][table] Support execution of drop materialized table
---
 .../MaterializedTableManager.java                  | 115 +++++++++-
 .../service/operation/OperationExecutor.java       |   9 +
 .../service/MaterializedTableStatementITCase.java  | 241 ++++++++++++++++++---
 .../apache/flink/table/catalog/CatalogManager.java |   4 +-
 4 files changed, 328 insertions(+), 41 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
index b4ba12b8755..a51b1885c98 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.table.gateway.service.materializedtable;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
@@ -34,6 +35,7 @@ import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.service.operation.OperationExecutor;
 import org.apache.flink.table.gateway.service.result.ResultFetcher;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.operations.command.DescribeJobOperation;
 import org.apache.flink.table.operations.command.StopJobOperation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
@@ -93,6 +95,9 @@ public class MaterializedTableManager {
         } else if (op instanceof AlterMaterializedTableResumeOperation) {
             return callAlterMaterializedTableResume(
                     operationExecutor, handle, 
(AlterMaterializedTableResumeOperation) op);
+        } else if (op instanceof DropMaterializedTableOperation) {
+            return callDropMaterializedTableOperation(
+                    operationExecutor, handle, 
(DropMaterializedTableOperation) op);
         }
 
         throw new SqlExecutionException(
@@ -146,8 +151,7 @@ public class MaterializedTableManager {
                     materializedTableIdentifier,
                     e);
             operationExecutor.callExecutableOperation(
-                    handle,
-                    new 
DropMaterializedTableOperation(materializedTableIdentifier, true, false));
+                    handle, new 
DropMaterializedTableOperation(materializedTableIdentifier, true));
             throw e;
         }
     }
@@ -170,7 +174,8 @@ public class MaterializedTableManager {
                         materializedTable.getSerializedRefreshHandler(),
                         
operationExecutor.getSessionContext().getUserClassloader());
 
-        String savepointPath = stopJobWithSavepoint(operationExecutor, handle, 
refreshHandler);
+        String savepointPath =
+                stopJobWithSavepoint(operationExecutor, handle, 
refreshHandler.getJobId());
 
         ContinuousRefreshHandler updateRefreshHandler =
                 new ContinuousRefreshHandler(
@@ -183,9 +188,12 @@ public class MaterializedTableManager {
                         CatalogMaterializedTable.RefreshStatus.SUSPENDED,
                         
materializedTable.getRefreshHandlerDescription().orElse(null),
                         serializeContinuousHandler(updateRefreshHandler));
+        List<TableChange> tableChanges = new ArrayList<>();
+        tableChanges.add(
+                
TableChange.modifyRefreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED));
         AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
                 new AlterMaterializedTableChangeOperation(
-                        tableIdentifier, Collections.emptyList(), 
updatedMaterializedTable);
+                        tableIdentifier, tableChanges, 
updatedMaterializedTable);
 
         operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
 
@@ -284,8 +292,7 @@ public class MaterializedTableManager {
             // drop materialized table while submit flink streaming job occur 
exception. Thus, weak
             // atomicity is guaranteed
             operationExecutor.callExecutableOperation(
-                    handle,
-                    new 
DropMaterializedTableOperation(materializedTableIdentifier, true, false));
+                    handle, new 
DropMaterializedTableOperation(materializedTableIdentifier, true));
             // log and throw exception
             LOG.error(
                     "Submit continuous refresh job for materialized table {} 
occur exception.",
@@ -414,10 +421,100 @@ public class MaterializedTableManager {
         return insertStatement.toString();
     }
 
-    private static String stopJobWithSavepoint(
-            OperationExecutor executor,
+    private static ResultFetcher callDropMaterializedTableOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            DropMaterializedTableOperation dropMaterializedTableOperation) {
+        ObjectIdentifier tableIdentifier = 
dropMaterializedTableOperation.getTableIdentifier();
+        boolean tableExists = operationExecutor.tableExists(tableIdentifier);
+        if (!tableExists) {
+            if (dropMaterializedTableOperation.isIfExists()) {
+                LOG.info(
+                        "Materialized table {} does not exists, skip the drop 
operation.",
+                        tableIdentifier);
+                return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, 
false);
+            } else {
+                throw new ValidationException(
+                        String.format(
+                                "Materialized table with identifier %s does 
not exist.",
+                                tableIdentifier));
+            }
+        }
+
+        CatalogMaterializedTable materializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
+                == materializedTable.getRefreshStatus()) {
+            ContinuousRefreshHandler refreshHandler =
+                    deserializeContinuousHandler(
+                            materializedTable.getSerializedRefreshHandler(),
+                            
operationExecutor.getSessionContext().getUserClassloader());
+            // get job running status
+            JobStatus jobStatus = getJobStatus(operationExecutor, handle, 
refreshHandler);
+            if (!jobStatus.isTerminalState()) {
+                try {
+                    cancelJob(operationExecutor, handle, 
refreshHandler.getJobId());
+                } catch (Exception e) {
+                    jobStatus = getJobStatus(operationExecutor, handle, 
refreshHandler);
+                    if (!jobStatus.isTerminalState()) {
+                        throw new SqlExecutionException(
+                                String.format(
+                                        "Failed to drop the materialized table 
%s because the continuous refresh job %s could not be canceled."
+                                                + " The current status of the 
continuous refresh job is %s.",
+                                        tableIdentifier, 
refreshHandler.getJobId(), jobStatus),
+                                e);
+                    } else {
+                        LOG.warn(
+                                "An exception occurred while canceling the 
continuous refresh job {} for materialized table {},"
+                                        + " but since the job is in a terminal 
state, skip the cancel operation.",
+                                refreshHandler.getJobId(),
+                                tableIdentifier);
+                    }
+                }
+            } else {
+                LOG.info(
+                        "No need to cancel the continuous refresh job {} for 
materialized table {} as it is not currently running.",
+                        refreshHandler.getJobId(),
+                        tableIdentifier);
+            }
+        } else if (CatalogMaterializedTable.RefreshStatus.INITIALIZING
+                == materializedTable.getRefreshStatus()) {
+            throw new ValidationException(
+                    String.format(
+                            "Current refresh status of materialized table %s 
is initializing, skip the drop operation.",
+                            tableIdentifier.asSerializableString()));
+        }
+
+        operationExecutor.callExecutableOperation(handle, 
dropMaterializedTableOperation);
+
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private static JobStatus getJobStatus(
+            OperationExecutor operationExecutor,
             OperationHandle handle,
             ContinuousRefreshHandler refreshHandler) {
+        ResultFetcher resultFetcher =
+                operationExecutor.callDescribeJobOperation(
+                        operationExecutor.getTableEnvironment(),
+                        handle,
+                        new DescribeJobOperation(refreshHandler.getJobId()));
+        List<RowData> result = fetchAllResults(resultFetcher);
+        String jobStatus = result.get(0).getString(2).toString();
+        return JobStatus.valueOf(jobStatus);
+    }
+
+    private static void cancelJob(
+            OperationExecutor operationExecutor, OperationHandle handle, 
String jobId) {
+        operationExecutor.callStopJobOperation(
+                operationExecutor.getTableEnvironment(),
+                handle,
+                new StopJobOperation(jobId, false, false));
+    }
+
+    private static String stopJobWithSavepoint(
+            OperationExecutor executor, OperationHandle handle, String jobId) {
         // check savepoint dir is configured
         Optional<String> savepointDir =
                 
executor.getSessionContext().getSessionConf().getOptional(SAVEPOINT_DIRECTORY);
@@ -429,7 +526,7 @@ public class MaterializedTableManager {
                 executor.callStopJobOperation(
                         executor.getTableEnvironment(),
                         handle,
-                        new StopJobOperation(refreshHandler.getJobId(), true, 
false));
+                        new StopJobOperation(jobId, true, false));
         List<RowData> results = fetchAllResults(resultFetcher);
         return results.get(0).getString(0).toString();
     }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index ddd0f930155..a273114a401 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -281,6 +281,15 @@ public class OperationExecutor {
         }
     }
 
+    public boolean tableExists(ObjectIdentifier tableIdentifier) {
+        return sessionContext
+                .getSessionState()
+                .catalogManager
+                .getCatalog(tableIdentifier.getCatalogName())
+                .map(catalog -> 
catalog.tableExists(tableIdentifier.toObjectPath()))
+                .orElse(false);
+    }
+
     public ResolvedCatalogBaseTable<?> getTable(ObjectIdentifier 
tableIdentifier) {
         return getTableEnvironment()
                 .getCatalogManager()
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index 6eb9bbf89fd..105c51ea597 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -39,16 +39,20 @@ import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.TableInfo;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
@@ -74,10 +78,12 @@ import java.nio.file.Path;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -87,6 +93,7 @@ import static 
org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_
 import static 
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination;
 import static 
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchAllResults;
 import static org.apache.flink.test.util.TestUtils.waitUntilAllTasksAreRunning;
+import static org.apache.flink.test.util.TestUtils.waitUntilJobCanceled;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -134,6 +141,8 @@ public class MaterializedTableStatementITCase {
     private String fileSystemCatalogPath;
     private String fileSystemCatalogName;
 
+    private SessionHandle sessionHandle;
+
     @BeforeAll
     static void setUp(@TempDir Path temporaryFolder) throws Exception {
         service = (SqlGatewayServiceImpl) 
SQL_GATEWAY_SERVICE_EXTENSION.getService();
@@ -167,17 +176,33 @@ public class MaterializedTableStatementITCase {
 
         fileSystemCatalogPath = fileCatalogPath.toString();
         fileSystemCatalogName = TEST_CATALOG_PREFIX + randomStr;
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        sessionHandle = initializeSession();
     }
 
     @AfterEach
-    void after(@InjectClusterClient RestClusterClient<?> restClusterClient) 
throws Exception {
-        // cancel all running jobs for releasing mini cluster resources
-        for (JobStatusMessage j : restClusterClient.listJobs().get()) {
-            // cancel all continuous refresh jobs
-            if (j.getJobName().endsWith("continuous_refresh_job")
-                    && (j.getJobState() == JobStatus.RUNNING
-                            || j.getJobState() == JobStatus.CREATED)) {
-                restClusterClient.cancel(j.getJobId()).get();
+    void after() throws Exception {
+        Set<TableInfo> tableInfos =
+                service.listTables(
+                        sessionHandle,
+                        fileSystemCatalogName,
+                        TEST_DEFAULT_DATABASE,
+                        
Collections.singleton(CatalogBaseTable.TableKind.TABLE));
+
+        // drop all materialized tables
+        for (TableInfo tableInfo : tableInfos) {
+            ResolvedCatalogBaseTable<?> resolvedTable =
+                    service.getTable(sessionHandle, tableInfo.getIdentifier());
+            if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == 
resolvedTable.getTableKind()) {
+                String dropTableDDL =
+                        String.format(
+                                "DROP MATERIALIZED TABLE %s",
+                                
tableInfo.getIdentifier().asSerializableString());
+                OperationHandle dropTableHandle =
+                        service.executeStatement(
+                                sessionHandle, dropTableDDL, -1, new 
Configuration());
+                awaitOperationTermination(service, sessionHandle, 
dropTableHandle);
             }
         }
     }
@@ -185,10 +210,6 @@ public class MaterializedTableStatementITCase {
     @Test
     void testCreateMaterializedTableInContinuousMode(
             @InjectClusterClient RestClusterClient<?> restClusterClient) 
throws Exception {
-        // initialize session handle, create test-filesystem catalog and 
register it to catalog
-        // store
-        SessionHandle sessionHandle = initializeSession();
-
         String materializedTableDDL =
                 "CREATE MATERIALIZED TABLE users_shops"
                         + " PARTITIONED BY (ds)\n"
@@ -267,10 +288,6 @@ public class MaterializedTableStatementITCase {
 
     @Test
     void testCreateMaterializedTableInFullMode() {
-        // initialize session handle, create test-filesystem catalog and 
register it to catalog
-        // store
-        SessionHandle sessionHandle = initializeSession();
-
         String materializedTableDDL =
                 "CREATE MATERIALIZED TABLE users_shops"
                         + " PARTITIONED BY (ds)\n"
@@ -302,14 +319,62 @@ public class MaterializedTableStatementITCase {
                         "Only support create materialized table in continuous 
refresh mode currently.");
     }
 
+    @Test
+    void testCreateMaterializedTableFailed() throws Exception {
+        // create a materialized table with invalid SQL
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service, sessionHandle, 
materializedTableHandle))
+                .cause()
+                .hasMessageContaining(
+                        String.format(
+                                "Submit continuous refresh job for 
materialized table %s occur exception.",
+                                ObjectIdentifier.of(
+                                                fileSystemCatalogName,
+                                                TEST_DEFAULT_DATABASE,
+                                                "users_shops")
+                                        .asSerializableString()));
+
+        // verify the materialized table is not created
+        assertThatThrownBy(
+                        () ->
+                                service.getTable(
+                                        sessionHandle,
+                                        ObjectIdentifier.of(
+                                                fileSystemCatalogName,
+                                                TEST_DEFAULT_DATABASE,
+                                                "users_shops")))
+                .isInstanceOf(SqlGatewayException.class)
+                .hasMessageContaining("Failed to getTable.");
+    }
+
     @Test
     void testAlterMaterializedTableRefresh(
             @InjectClusterClient RestClusterClient<?> restClusterClient) 
throws Exception {
         long timeout = Duration.ofSeconds(20).toMillis();
         long pause = Duration.ofSeconds(2).toMillis();
-        // initialize session handle, create test-filesystem catalog and 
register it to catalog
-        // store
-        SessionHandle sessionHandle = initializeSession();
 
         List<Row> data = new ArrayList<>();
         data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
@@ -434,10 +499,6 @@ public class MaterializedTableStatementITCase {
 
     @Test
     void testAlterMaterializedTableRefreshWithInvalidPartitionSpec() throws 
Exception {
-        // initialize session handle, create test-filesystem catalog and 
register it to catalog
-        // store
-        SessionHandle sessionHandle = initializeSession();
-
         String materializedTableDDL =
                 "CREATE MATERIALIZED TABLE users_shops"
                         + " PARTITIONED BY (ds1, ds2)\n"
@@ -521,10 +582,6 @@ public class MaterializedTableStatementITCase {
             @TempDir Path temporaryPath,
             @InjectClusterClient RestClusterClient<?> restClusterClient)
             throws Exception {
-        // initialize session handle, create test-filesystem catalog and 
register it to catalog
-        // store
-        SessionHandle sessionHandle = initializeSession();
-
         String materializedTableDDL =
                 "CREATE MATERIALIZED TABLE users_shops"
                         + " PARTITIONED BY (ds)\n"
@@ -659,10 +716,6 @@ public class MaterializedTableStatementITCase {
     @Test
     void testAlterMaterializedTableWithoutSavepointDirConfigured(
             @InjectClusterClient RestClusterClient<?> restClusterClient) 
throws Exception {
-        // initialize session handle, create test-filesystem catalog and 
register it to catalog
-        // store
-        SessionHandle sessionHandle = initializeSession();
-
         String materializedTableDDL =
                 "CREATE MATERIALIZED TABLE users_shops"
                         + " PARTITIONED BY (ds)\n"
@@ -720,6 +773,132 @@ public class MaterializedTableStatementITCase {
                         "Savepoint directory is not configured, can't stop job 
with savepoint.");
     }
 
+    @Test
+    void testDropMaterializedTable(@InjectClusterClient RestClusterClient<?> 
restClusterClient)
+            throws Exception {
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        // verify materialized table exists
+        ResolvedCatalogBaseTable<?> activeMaterializedTable =
+                service.getTable(
+                        sessionHandle,
+                        ObjectIdentifier.of(
+                                fileSystemCatalogName, TEST_DEFAULT_DATABASE, 
"users_shops"));
+
+        
assertThat(activeMaterializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+
+        // verify background job is running
+        ContinuousRefreshHandler activeRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        ((ResolvedCatalogMaterializedTable) 
activeMaterializedTable)
+                                .getSerializedRefreshHandler(),
+                        getClass().getClassLoader());
+        String describeJobDDL = String.format("DESCRIBE JOB '%s'", 
activeRefreshHandler.getJobId());
+        OperationHandle describeJobHandle =
+                service.executeStatement(sessionHandle, describeJobDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, describeJobHandle);
+        List<RowData> jobResults = fetchAllResults(service, sessionHandle, 
describeJobHandle);
+        
assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING");
+
+        // drop materialized table
+        String dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS 
users_shops";
+        OperationHandle dropMaterializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, dropMaterializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
dropMaterializedTableHandle);
+
+        // verify materialized table metadata is removed
+        assertThatThrownBy(
+                        () ->
+                                service.getTable(
+                                        sessionHandle,
+                                        ObjectIdentifier.of(
+                                                fileSystemCatalogName,
+                                                TEST_DEFAULT_DATABASE,
+                                                "users_shops")))
+                .isInstanceOf(SqlGatewayException.class)
+                .hasMessageContaining("Failed to getTable.");
+
+        // verify background job is canceled
+        waitUntilJobCanceled(
+                JobID.fromHexString(activeRefreshHandler.getJobId()), 
restClusterClient);
+
+        String describeJobAfterDropDDL =
+                String.format("DESCRIBE JOB '%s'", 
activeRefreshHandler.getJobId());
+        OperationHandle describeJobAfterDropHandle =
+                service.executeStatement(
+                        sessionHandle, describeJobAfterDropDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
describeJobAfterDropHandle);
+        List<RowData> jobResultsAfterDrop =
+                fetchAllResults(service, sessionHandle, 
describeJobAfterDropHandle);
+        
assertThat(jobResultsAfterDrop.get(0).getString(2).toString()).isEqualTo("CANCELED");
+
+        // verify drop materialized table that doesn't exist
+        String dropNonExistMaterializedTableDDL = "DROP MATERIALIZED TABLE 
users_shops";
+        OperationHandle dropNonExistTableHandle =
+                service.executeStatement(
+                        sessionHandle, dropNonExistMaterializedTableDDL, -1, 
new Configuration());
+
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service, sessionHandle, 
dropNonExistTableHandle))
+                .rootCause()
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        String.format(
+                                "Materialized table with identifier %s does 
not exist.",
+                                ObjectIdentifier.of(
+                                                fileSystemCatalogName,
+                                                TEST_DEFAULT_DATABASE,
+                                                "users_shops")
+                                        .asSerializableString()));
+
+        String dropNonExistMaterializedTableDDL2 = "DROP MATERIALIZED TABLE IF 
EXISTS users_shops";
+        OperationHandle dropNonExistMaterializedTableHandle2 =
+                service.executeStatement(
+                        sessionHandle, dropNonExistMaterializedTableDDL2, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
dropNonExistMaterializedTableHandle2);
+
+        // Drop a table using drop materialized table statement
+        dropMaterializedTableDDL = "DROP MATERIALIZED TABLE IF EXISTS 
datagenSource";
+        OperationHandle dropTableHandle =
+                service.executeStatement(
+                        sessionHandle, dropMaterializedTableDDL, -1, new 
Configuration());
+        assertThatThrownBy(() -> awaitOperationTermination(service, 
sessionHandle, dropTableHandle))
+                .rootCause()
+                .isInstanceOf(ValidationException.class)
+                .hasMessage(
+                        String.format(
+                                "Table %s is not a materialized table, does 
not support materialized table related operation.",
+                                ObjectIdentifier.of(
+                                                fileSystemCatalogName,
+                                                TEST_DEFAULT_DATABASE,
+                                                "datagenSource")
+                                        .asSerializableString()));
+    }
+
     private SessionHandle initializeSession() {
         SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
         String catalogDDL =
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 2f92cb2e528..5e8826e6c3a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -1268,7 +1268,9 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
             ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists, 
boolean isDropTable) {
         Predicate<CatalogBaseTable> filter =
                 isDropTable
-                        ? table -> table instanceof CatalogTable
+                        ? table ->
+                                table instanceof CatalogTable
+                                        || table instanceof 
CatalogMaterializedTable
                         : table -> table instanceof CatalogView;
         // Same name temporary table or view exists.
         if (filter.test(temporaryTables.get(objectIdentifier))) {

Reply via email to