lsyldliu commented on code in PR #24877:
URL: https://github.com/apache/flink/pull/24877#discussion_r1623739592


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -302,21 +315,47 @@ private ResultFetcher callAlterMaterializedTableSuspend(
                         refreshHandler.getJobId(),
                         savepointPath);
 
-        CatalogMaterializedTable updatedMaterializedTable =
-                materializedTable.copy(
-                        CatalogMaterializedTable.RefreshStatus.SUSPENDED,
-                        
materializedTable.getRefreshHandlerDescription().orElse(null),
-                        serializeContinuousHandler(updateRefreshHandler));
-        List<TableChange> tableChanges = new ArrayList<>();
-        tableChanges.add(
-                
TableChange.modifyRefreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED));
-        AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
-                new AlterMaterializedTableChangeOperation(
-                        tableIdentifier, tableChanges, 
updatedMaterializedTable);
+        updateRefreshHandler(
+                operationExecutor,
+                handle,
+                tableIdentifier,
+                materializedTable,
+                CatalogMaterializedTable.RefreshStatus.SUSPENDED,
+                updateRefreshHandler.asSummaryString(),
+                serializeContinuousHandler(updateRefreshHandler));
+    }
 
-        operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
+    private void suspendRefreshWorkflow(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ObjectIdentifier tableIdentifier,
+            CatalogMaterializedTable materializedTable) {
 
-        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+        try {
+            RefreshHandlerSerializer refreshHandlerSerializer =
+                    workflowScheduler.getRefreshHandlerSerializer();

Review Comment:
   You should check the `workflowScheduler` is not null before suspending.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -328,24 +367,81 @@ private ResultFetcher callAlterMaterializedTableResume(
                 getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
 
         if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
-                != catalogMaterializedTable.getRefreshMode()) {
-            throw new SqlExecutionException(
-                    "Only support resume continuous refresh job currently.");
+                == catalogMaterializedTable.getRefreshMode()) {
+            resumeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    tableIdentifier,
+                    catalogMaterializedTable,
+                    op.getDynamicOptions());
+        } else {
+            resumeRefreshWorkflow(
+                    operationExecutor,
+                    handle,
+                    tableIdentifier,
+                    catalogMaterializedTable,
+                    op.getDynamicOptions());
         }
 
-        ContinuousRefreshHandler continuousRefreshHandler =
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private void resumeContinuousRefreshJob(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ObjectIdentifier tableIdentifier,
+            CatalogMaterializedTable catalogMaterializedTable,
+            Map<String, String> dynamicOptions) {
+        ContinuousRefreshHandler refreshHandler =
                 deserializeContinuousHandler(
                         
catalogMaterializedTable.getSerializedRefreshHandler());
-        Optional<String> restorePath = 
continuousRefreshHandler.getRestorePath();
+
+        Optional<String> restorePath = refreshHandler.getRestorePath();
         executeContinuousRefreshJob(
                 operationExecutor,
                 handle,
                 catalogMaterializedTable,
                 tableIdentifier,
-                op.getDynamicOptions(),
+                dynamicOptions,
                 restorePath);
+    }
 
-        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    private void resumeRefreshWorkflow(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ObjectIdentifier tableIdentifier,
+            CatalogMaterializedTable catalogMaterializedTable,
+            Map<String, String> dynamicOptions) {
+        try {
+            RefreshHandlerSerializer refreshHandlerSerializer =

Review Comment:
   ```suggestion
               RefreshHandlerSerializer<?> refreshHandlerSerializer =
   ```



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -652,6 +654,158 @@ void 
testAlterMaterializedTableWithoutSavepointDirConfigured() throws Exception
                         "Savepoint directory is not configured, can't stop job 
with savepoint.");
     }
 
+    @Test
+    void testAlterMaterializedSuspendAndResumeInFullMode() throws Exception {
+        createMaterializedTableInFullMode("users_shops", 
Collections.emptyList());
+
+        ResolvedCatalogMaterializedTable activeMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(activeMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
+        // suspend materialized table
+        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE 
users_shops SUSPEND";
+        OperationHandle alterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+
+        ResolvedCatalogMaterializedTable suspendMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(suspendMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.SUSPENDED);
+
+        // verify workflow is suspended
+        byte[] refreshHandler = 
suspendMaterializedTable.getSerializedRefreshHandler();
+        EmbeddedRefreshHandler suspendRefreshHandler =
+                EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(
+                        refreshHandler, getClass().getClassLoader());
+
+        String workflowName = suspendRefreshHandler.getWorkflowName();
+        String workflowGroup = suspendRefreshHandler.getWorkflowGroup();
+        EmbeddedQuartzScheduler embeddedWorkflowScheduler =
+                SQL_GATEWAY_REST_ENDPOINT_EXTENSION
+                        .getSqlGatewayRestEndpoint()
+                        .getQuartzScheduler();
+        JobKey jobKey = new JobKey(workflowName, workflowGroup);
+        Trigger.TriggerState suspendTriggerState =
+                embeddedWorkflowScheduler
+                        .getQuartzScheduler()
+                        .getTriggerState(new TriggerKey(workflowName, 
workflowGroup));

Review Comment:
   TriggerKey.triggerKey(workflowName, workflowGroup)



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -652,6 +654,158 @@ void 
testAlterMaterializedTableWithoutSavepointDirConfigured() throws Exception
                         "Savepoint directory is not configured, can't stop job 
with savepoint.");
     }
 
+    @Test
+    void testAlterMaterializedSuspendAndResumeInFullMode() throws Exception {
+        createMaterializedTableInFullMode("users_shops", 
Collections.emptyList());
+
+        ResolvedCatalogMaterializedTable activeMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(activeMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
+        // suspend materialized table
+        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE 
users_shops SUSPEND";
+        OperationHandle alterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+
+        ResolvedCatalogMaterializedTable suspendMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(suspendMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.SUSPENDED);
+
+        // verify workflow is suspended
+        byte[] refreshHandler = 
suspendMaterializedTable.getSerializedRefreshHandler();
+        EmbeddedRefreshHandler suspendRefreshHandler =
+                EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(
+                        refreshHandler, getClass().getClassLoader());
+
+        String workflowName = suspendRefreshHandler.getWorkflowName();
+        String workflowGroup = suspendRefreshHandler.getWorkflowGroup();
+        EmbeddedQuartzScheduler embeddedWorkflowScheduler =
+                SQL_GATEWAY_REST_ENDPOINT_EXTENSION
+                        .getSqlGatewayRestEndpoint()
+                        .getQuartzScheduler();
+        JobKey jobKey = new JobKey(workflowName, workflowGroup);
+        Trigger.TriggerState suspendTriggerState =
+                embeddedWorkflowScheduler
+                        .getQuartzScheduler()
+                        .getTriggerState(new TriggerKey(workflowName, 
workflowGroup));
+
+        assertThat(suspendTriggerState).isEqualTo(Trigger.TriggerState.PAUSED);
+
+        // resume materialized table
+        String alterMaterializedTableResumeDDL =
+                "ALTER MATERIALIZED TABLE users_shops RESUME WITH 
('debezium-json.ignore-parse-errors' = 'true')";
+        OperationHandle alterMaterializedTableResumeHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableResumeDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableResumeHandle);
+
+        ResolvedCatalogMaterializedTable resumedCatalogMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(resumedCatalogMaterializedTable.getOptions())
+                .doesNotContainKey("debezium-json.ignore-parse-errors");
+        assertThat(resumedCatalogMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
+        // verify workflow is resumed
+        refreshHandler = 
resumedCatalogMaterializedTable.getSerializedRefreshHandler();
+        EmbeddedRefreshHandler resumeRefreshHandler =
+                EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(
+                        refreshHandler, getClass().getClassLoader());
+
+        
assertThat(resumeRefreshHandler.getWorkflowName()).isEqualTo(workflowName);
+        
assertThat(resumeRefreshHandler.getWorkflowGroup()).isEqualTo(workflowGroup);
+
+        JobDetail jobDetail = 
embeddedWorkflowScheduler.getQuartzScheduler().getJobDetail(jobKey);
+        Trigger.TriggerState resumedTriggerState =
+                embeddedWorkflowScheduler
+                        .getQuartzScheduler()
+                        .getTriggerState(new TriggerKey(workflowName, 
workflowGroup));
+        assertThat(resumedTriggerState).isEqualTo(Trigger.TriggerState.NORMAL);
+
+        WorkflowInfo workflowInfo =
+                fromJson((String) 
jobDetail.getJobDataMap().get(WORKFLOW_INFO), WorkflowInfo.class);
+        assertThat(workflowInfo.getDynamicOptions())
+                .containsEntry("debezium-json.ignore-parse-errors", "true");
+
+        // delete the workflow
+        embeddedWorkflowScheduler.deleteScheduleWorkflow(jobKey.getName(), 
jobKey.getGroup());
+    }
+
+    private void createMaterializedTableInFullMode(String tableName, List<Row> 
data)

Review Comment:
   Why don't you reuse the util method 
`createAndVerifyCreateMaterializedTableWithData`?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -302,21 +315,47 @@ private ResultFetcher callAlterMaterializedTableSuspend(
                         refreshHandler.getJobId(),
                         savepointPath);
 
-        CatalogMaterializedTable updatedMaterializedTable =
-                materializedTable.copy(
-                        CatalogMaterializedTable.RefreshStatus.SUSPENDED,
-                        
materializedTable.getRefreshHandlerDescription().orElse(null),
-                        serializeContinuousHandler(updateRefreshHandler));
-        List<TableChange> tableChanges = new ArrayList<>();
-        tableChanges.add(
-                
TableChange.modifyRefreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED));
-        AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
-                new AlterMaterializedTableChangeOperation(
-                        tableIdentifier, tableChanges, 
updatedMaterializedTable);
+        updateRefreshHandler(
+                operationExecutor,
+                handle,
+                tableIdentifier,
+                materializedTable,
+                CatalogMaterializedTable.RefreshStatus.SUSPENDED,
+                updateRefreshHandler.asSummaryString(),
+                serializeContinuousHandler(updateRefreshHandler));
+    }
 
-        operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
+    private void suspendRefreshWorkflow(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ObjectIdentifier tableIdentifier,
+            CatalogMaterializedTable materializedTable) {
 
-        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+        try {
+            RefreshHandlerSerializer refreshHandlerSerializer =
+                    workflowScheduler.getRefreshHandlerSerializer();
+            RefreshHandler refreshHandler =
+                    refreshHandlerSerializer.deserialize(
+                            materializedTable.getSerializedRefreshHandler(), 
userCodeClassLoader);
+            ModifyRefreshWorkflow modifyRefreshWorkflow =
+                    new SuspendRefreshWorkflow(refreshHandler);
+            workflowScheduler.modifyRefreshWorkflow(modifyRefreshWorkflow);
+
+            updateRefreshHandler(
+                    operationExecutor,
+                    handle,
+                    tableIdentifier,
+                    materializedTable,
+                    CatalogMaterializedTable.RefreshStatus.SUSPENDED,
+                    refreshHandler.asSummaryString(),
+                    materializedTable.getSerializedRefreshHandler());
+        } catch (Exception e) {
+            throw new SqlExecutionException(
+                    String.format(
+                            "Failed to suspend the refresh job for 
materialized table %s.",

Review Comment:
   ```suggestion
                               "Failed to suspend the refresh workflow for 
materialized table %s.",
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -328,24 +367,81 @@ private ResultFetcher callAlterMaterializedTableResume(
                 getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
 
         if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
-                != catalogMaterializedTable.getRefreshMode()) {
-            throw new SqlExecutionException(
-                    "Only support resume continuous refresh job currently.");
+                == catalogMaterializedTable.getRefreshMode()) {
+            resumeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    tableIdentifier,
+                    catalogMaterializedTable,
+                    op.getDynamicOptions());
+        } else {
+            resumeRefreshWorkflow(
+                    operationExecutor,
+                    handle,
+                    tableIdentifier,
+                    catalogMaterializedTable,
+                    op.getDynamicOptions());
         }
 
-        ContinuousRefreshHandler continuousRefreshHandler =
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private void resumeContinuousRefreshJob(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ObjectIdentifier tableIdentifier,
+            CatalogMaterializedTable catalogMaterializedTable,
+            Map<String, String> dynamicOptions) {
+        ContinuousRefreshHandler refreshHandler =
                 deserializeContinuousHandler(
                         
catalogMaterializedTable.getSerializedRefreshHandler());
-        Optional<String> restorePath = 
continuousRefreshHandler.getRestorePath();
+
+        Optional<String> restorePath = refreshHandler.getRestorePath();
         executeContinuousRefreshJob(
                 operationExecutor,
                 handle,
                 catalogMaterializedTable,
                 tableIdentifier,
-                op.getDynamicOptions(),
+                dynamicOptions,
                 restorePath);
+    }
 
-        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    private void resumeRefreshWorkflow(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ObjectIdentifier tableIdentifier,
+            CatalogMaterializedTable catalogMaterializedTable,
+            Map<String, String> dynamicOptions) {
+        try {
+            RefreshHandlerSerializer refreshHandlerSerializer =
+                    workflowScheduler.getRefreshHandlerSerializer();

Review Comment:
   ditto



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -328,24 +367,81 @@ private ResultFetcher callAlterMaterializedTableResume(
                 getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
 
         if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
-                != catalogMaterializedTable.getRefreshMode()) {
-            throw new SqlExecutionException(
-                    "Only support resume continuous refresh job currently.");
+                == catalogMaterializedTable.getRefreshMode()) {
+            resumeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    tableIdentifier,
+                    catalogMaterializedTable,
+                    op.getDynamicOptions());
+        } else {
+            resumeRefreshWorkflow(
+                    operationExecutor,
+                    handle,
+                    tableIdentifier,
+                    catalogMaterializedTable,
+                    op.getDynamicOptions());
         }
 
-        ContinuousRefreshHandler continuousRefreshHandler =
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private void resumeContinuousRefreshJob(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ObjectIdentifier tableIdentifier,
+            CatalogMaterializedTable catalogMaterializedTable,
+            Map<String, String> dynamicOptions) {
+        ContinuousRefreshHandler refreshHandler =
                 deserializeContinuousHandler(
                         
catalogMaterializedTable.getSerializedRefreshHandler());
-        Optional<String> restorePath = 
continuousRefreshHandler.getRestorePath();
+
+        Optional<String> restorePath = refreshHandler.getRestorePath();
         executeContinuousRefreshJob(
                 operationExecutor,
                 handle,
                 catalogMaterializedTable,
                 tableIdentifier,
-                op.getDynamicOptions(),
+                dynamicOptions,
                 restorePath);
+    }
 
-        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    private void resumeRefreshWorkflow(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ObjectIdentifier tableIdentifier,
+            CatalogMaterializedTable catalogMaterializedTable,
+            Map<String, String> dynamicOptions) {
+        try {
+            RefreshHandlerSerializer refreshHandlerSerializer =
+                    workflowScheduler.getRefreshHandlerSerializer();
+            RefreshHandler refreshHandler =
+                    refreshHandlerSerializer.deserialize(
+                            
catalogMaterializedTable.getSerializedRefreshHandler(),
+                            userCodeClassLoader);
+            ModifyRefreshWorkflow modifyRefreshWorkflow =
+                    new ResumeRefreshWorkflow(refreshHandler, dynamicOptions);
+            workflowScheduler.modifyRefreshWorkflow(modifyRefreshWorkflow);
+
+            updateRefreshHandler(
+                    operationExecutor,
+                    handle,
+                    tableIdentifier,
+                    catalogMaterializedTable,
+                    CatalogMaterializedTable.RefreshStatus.ACTIVATED,
+                    refreshHandler.asSummaryString(),
+                    catalogMaterializedTable.getSerializedRefreshHandler());
+        } catch (Exception e) {
+            LOG.error(
+                    "Failed to resume the refresh job for materialized table 
{}.",
+                    tableIdentifier,
+                    e);
+            throw new SqlExecutionException(
+                    String.format(
+                            "Failed to resume the refresh job for materialized 
table %s.",

Review Comment:
   ditto



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -302,21 +315,47 @@ private ResultFetcher callAlterMaterializedTableSuspend(
                         refreshHandler.getJobId(),
                         savepointPath);
 
-        CatalogMaterializedTable updatedMaterializedTable =
-                materializedTable.copy(
-                        CatalogMaterializedTable.RefreshStatus.SUSPENDED,
-                        
materializedTable.getRefreshHandlerDescription().orElse(null),
-                        serializeContinuousHandler(updateRefreshHandler));
-        List<TableChange> tableChanges = new ArrayList<>();
-        tableChanges.add(
-                
TableChange.modifyRefreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED));
-        AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
-                new AlterMaterializedTableChangeOperation(
-                        tableIdentifier, tableChanges, 
updatedMaterializedTable);
+        updateRefreshHandler(
+                operationExecutor,
+                handle,
+                tableIdentifier,
+                materializedTable,
+                CatalogMaterializedTable.RefreshStatus.SUSPENDED,
+                updateRefreshHandler.asSummaryString(),
+                serializeContinuousHandler(updateRefreshHandler));
+    }
 
-        operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
+    private void suspendRefreshWorkflow(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ObjectIdentifier tableIdentifier,
+            CatalogMaterializedTable materializedTable) {
 
-        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+        try {
+            RefreshHandlerSerializer refreshHandlerSerializer =

Review Comment:
   ```suggestion
               RefreshHandlerSerializer<?> refreshHandlerSerializer =
   ```



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -652,6 +654,158 @@ void 
testAlterMaterializedTableWithoutSavepointDirConfigured() throws Exception
                         "Savepoint directory is not configured, can't stop job 
with savepoint.");
     }
 
+    @Test
+    void testAlterMaterializedSuspendAndResumeInFullMode() throws Exception {
+        createMaterializedTableInFullMode("users_shops", 
Collections.emptyList());
+
+        ResolvedCatalogMaterializedTable activeMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(activeMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
+        // suspend materialized table
+        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE 
users_shops SUSPEND";
+        OperationHandle alterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+
+        ResolvedCatalogMaterializedTable suspendMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(suspendMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.SUSPENDED);
+
+        // verify workflow is suspended
+        byte[] refreshHandler = 
suspendMaterializedTable.getSerializedRefreshHandler();
+        EmbeddedRefreshHandler suspendRefreshHandler =
+                EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(
+                        refreshHandler, getClass().getClassLoader());
+
+        String workflowName = suspendRefreshHandler.getWorkflowName();
+        String workflowGroup = suspendRefreshHandler.getWorkflowGroup();
+        EmbeddedQuartzScheduler embeddedWorkflowScheduler =
+                SQL_GATEWAY_REST_ENDPOINT_EXTENSION
+                        .getSqlGatewayRestEndpoint()
+                        .getQuartzScheduler();
+        JobKey jobKey = new JobKey(workflowName, workflowGroup);
+        Trigger.TriggerState suspendTriggerState =
+                embeddedWorkflowScheduler
+                        .getQuartzScheduler()
+                        .getTriggerState(new TriggerKey(workflowName, 
workflowGroup));
+
+        assertThat(suspendTriggerState).isEqualTo(Trigger.TriggerState.PAUSED);
+
+        // resume materialized table
+        String alterMaterializedTableResumeDDL =
+                "ALTER MATERIALIZED TABLE users_shops RESUME WITH 
('debezium-json.ignore-parse-errors' = 'true')";
+        OperationHandle alterMaterializedTableResumeHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableResumeDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableResumeHandle);
+
+        ResolvedCatalogMaterializedTable resumedCatalogMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(resumedCatalogMaterializedTable.getOptions())
+                .doesNotContainKey("debezium-json.ignore-parse-errors");

Review Comment:
   I think we don't need to check this config key for table options.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -328,24 +367,81 @@ private ResultFetcher callAlterMaterializedTableResume(
                 getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
 
         if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
-                != catalogMaterializedTable.getRefreshMode()) {
-            throw new SqlExecutionException(
-                    "Only support resume continuous refresh job currently.");
+                == catalogMaterializedTable.getRefreshMode()) {
+            resumeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    tableIdentifier,
+                    catalogMaterializedTable,
+                    op.getDynamicOptions());
+        } else {
+            resumeRefreshWorkflow(
+                    operationExecutor,
+                    handle,
+                    tableIdentifier,
+                    catalogMaterializedTable,
+                    op.getDynamicOptions());
         }
 
-        ContinuousRefreshHandler continuousRefreshHandler =
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private void resumeContinuousRefreshJob(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ObjectIdentifier tableIdentifier,
+            CatalogMaterializedTable catalogMaterializedTable,
+            Map<String, String> dynamicOptions) {
+        ContinuousRefreshHandler refreshHandler =
                 deserializeContinuousHandler(
                         
catalogMaterializedTable.getSerializedRefreshHandler());
-        Optional<String> restorePath = 
continuousRefreshHandler.getRestorePath();
+
+        Optional<String> restorePath = refreshHandler.getRestorePath();
         executeContinuousRefreshJob(
                 operationExecutor,
                 handle,
                 catalogMaterializedTable,
                 tableIdentifier,
-                op.getDynamicOptions(),
+                dynamicOptions,
                 restorePath);
+    }
 
-        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    private void resumeRefreshWorkflow(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ObjectIdentifier tableIdentifier,
+            CatalogMaterializedTable catalogMaterializedTable,
+            Map<String, String> dynamicOptions) {
+        try {
+            RefreshHandlerSerializer refreshHandlerSerializer =
+                    workflowScheduler.getRefreshHandlerSerializer();
+            RefreshHandler refreshHandler =
+                    refreshHandlerSerializer.deserialize(
+                            
catalogMaterializedTable.getSerializedRefreshHandler(),
+                            userCodeClassLoader);
+            ModifyRefreshWorkflow modifyRefreshWorkflow =
+                    new ResumeRefreshWorkflow(refreshHandler, dynamicOptions);
+            workflowScheduler.modifyRefreshWorkflow(modifyRefreshWorkflow);
+
+            updateRefreshHandler(
+                    operationExecutor,
+                    handle,
+                    tableIdentifier,
+                    catalogMaterializedTable,
+                    CatalogMaterializedTable.RefreshStatus.ACTIVATED,
+                    refreshHandler.asSummaryString(),
+                    catalogMaterializedTable.getSerializedRefreshHandler());
+        } catch (Exception e) {
+            LOG.error(
+                    "Failed to resume the refresh job for materialized table 
{}.",

Review Comment:
   ```suggestion
                       "Failed to resume the refresh workflow for materialized 
table {}.",
   ```



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -652,6 +654,158 @@ void 
testAlterMaterializedTableWithoutSavepointDirConfigured() throws Exception
                         "Savepoint directory is not configured, can't stop job 
with savepoint.");
     }
 
+    @Test
+    void testAlterMaterializedSuspendAndResumeInFullMode() throws Exception {
+        createMaterializedTableInFullMode("users_shops", 
Collections.emptyList());
+
+        ResolvedCatalogMaterializedTable activeMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(activeMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
+        // suspend materialized table
+        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE 
users_shops SUSPEND";
+        OperationHandle alterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+
+        ResolvedCatalogMaterializedTable suspendMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(suspendMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.SUSPENDED);
+
+        // verify workflow is suspended
+        byte[] refreshHandler = 
suspendMaterializedTable.getSerializedRefreshHandler();
+        EmbeddedRefreshHandler suspendRefreshHandler =
+                EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(
+                        refreshHandler, getClass().getClassLoader());
+
+        String workflowName = suspendRefreshHandler.getWorkflowName();
+        String workflowGroup = suspendRefreshHandler.getWorkflowGroup();
+        EmbeddedQuartzScheduler embeddedWorkflowScheduler =
+                SQL_GATEWAY_REST_ENDPOINT_EXTENSION
+                        .getSqlGatewayRestEndpoint()
+                        .getQuartzScheduler();
+        JobKey jobKey = new JobKey(workflowName, workflowGroup);
+        Trigger.TriggerState suspendTriggerState =
+                embeddedWorkflowScheduler
+                        .getQuartzScheduler()
+                        .getTriggerState(new TriggerKey(workflowName, 
workflowGroup));
+
+        assertThat(suspendTriggerState).isEqualTo(Trigger.TriggerState.PAUSED);
+
+        // resume materialized table
+        String alterMaterializedTableResumeDDL =
+                "ALTER MATERIALIZED TABLE users_shops RESUME WITH 
('debezium-json.ignore-parse-errors' = 'true')";
+        OperationHandle alterMaterializedTableResumeHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableResumeDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableResumeHandle);
+
+        ResolvedCatalogMaterializedTable resumedCatalogMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(resumedCatalogMaterializedTable.getOptions())
+                .doesNotContainKey("debezium-json.ignore-parse-errors");
+        assertThat(resumedCatalogMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
+        // verify workflow is resumed
+        refreshHandler = 
resumedCatalogMaterializedTable.getSerializedRefreshHandler();
+        EmbeddedRefreshHandler resumeRefreshHandler =
+                EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(
+                        refreshHandler, getClass().getClassLoader());
+
+        
assertThat(resumeRefreshHandler.getWorkflowName()).isEqualTo(workflowName);
+        
assertThat(resumeRefreshHandler.getWorkflowGroup()).isEqualTo(workflowGroup);
+
+        JobDetail jobDetail = 
embeddedWorkflowScheduler.getQuartzScheduler().getJobDetail(jobKey);
+        Trigger.TriggerState resumedTriggerState =
+                embeddedWorkflowScheduler
+                        .getQuartzScheduler()
+                        .getTriggerState(new TriggerKey(workflowName, 
workflowGroup));

Review Comment:
   ditto



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.java:
##########
@@ -210,7 +226,37 @@ public void resumeScheduleWorkflow(String workflowName, 
String workflowGroup)
                             "Failed to resume a non-existent quartz schedule 
job: %s.", jobKey);
             checkJobExists(jobKey, errorMsg);
 
-            quartzScheduler.resumeJob(jobKey);
+            if (dynamicOptions.isEmpty()) {
+                quartzScheduler.resumeJob(jobKey);
+            } else {
+                // remove old job and create a new job with new dynamic options
+                JobDetail jobDetail = quartzScheduler.getJobDetail(jobKey);
+                WorkflowInfo workflowInfo =
+                        fromJson(
+                                
jobDetail.getJobDataMap().getString(WORKFLOW_INFO),
+                                WorkflowInfo.class);
+                // create a new job with new dynamic options
+                WorkflowInfo newWorkflowInfo =
+                        new WorkflowInfo(
+                                workflowInfo.getMaterializedTableIdentifier(),
+                                dynamicOptions,
+                                workflowInfo.getInitConfig(),
+                                workflowInfo.getExecutionConfig(),
+                                workflowInfo.getCustomSchedulerTime(),
+                                workflowInfo.getRestEndpointUrl());
+
+                // create a new job
+                Trigger trigger =

Review Comment:
   ```
                   CronTrigger trigger =
                           (CronTrigger)
                                   quartzScheduler.getTrigger(
                                           
TriggerKey.triggerKey(jobKey.getName(), jobKey.getGroup()));
   ```



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -652,6 +654,158 @@ void 
testAlterMaterializedTableWithoutSavepointDirConfigured() throws Exception
                         "Savepoint directory is not configured, can't stop job 
with savepoint.");
     }
 
+    @Test
+    void testAlterMaterializedSuspendAndResumeInFullMode() throws Exception {
+        createMaterializedTableInFullMode("users_shops", 
Collections.emptyList());
+
+        ResolvedCatalogMaterializedTable activeMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(activeMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
+        // suspend materialized table
+        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE 
users_shops SUSPEND";
+        OperationHandle alterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+
+        ResolvedCatalogMaterializedTable suspendMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(suspendMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.SUSPENDED);
+
+        // verify workflow is suspended
+        byte[] refreshHandler = 
suspendMaterializedTable.getSerializedRefreshHandler();
+        EmbeddedRefreshHandler suspendRefreshHandler =
+                EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(
+                        refreshHandler, getClass().getClassLoader());
+
+        String workflowName = suspendRefreshHandler.getWorkflowName();
+        String workflowGroup = suspendRefreshHandler.getWorkflowGroup();
+        EmbeddedQuartzScheduler embeddedWorkflowScheduler =
+                SQL_GATEWAY_REST_ENDPOINT_EXTENSION
+                        .getSqlGatewayRestEndpoint()
+                        .getQuartzScheduler();
+        JobKey jobKey = new JobKey(workflowName, workflowGroup);

Review Comment:
   ```suggestion
           JobKey jobKey = JobKey.jobKey(workflowName, workflowGroup);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to