This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4420805067303841524f7b9987b33c8266b8cb87
Author: Feng Jin <jinfeng1...@gmail.com>
AuthorDate: Sun Jan 5 20:06:59 2025 +0800

    [FLINK-36994][table] Support executing ALTER MATERIALIZED TABLE AS operation
---
 .../MaterializedTableManager.java                  | 173 +++++++++-
 .../service/MaterializedTableStatementITCase.java  | 349 +++++++++++++++++++++
 2 files changed, 518 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
index 2398e72172a..2c2ea78f100 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.MaterializedTableChange;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -48,6 +49,7 @@ import 
org.apache.flink.table.gateway.service.result.ResultFetcher;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.operations.command.DescribeJobOperation;
 import org.apache.flink.table.operations.command.StopJobOperation;
+import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
@@ -174,6 +176,9 @@ public class MaterializedTableManager {
         } else if (op instanceof DropMaterializedTableOperation) {
             return callDropMaterializedTableOperation(
                     operationExecutor, handle, 
(DropMaterializedTableOperation) op);
+        } else if (op instanceof AlterMaterializedTableAsQueryOperation) {
+            return callAlterMaterializedTableAsQueryOperation(
+                    operationExecutor, handle, 
(AlterMaterializedTableAsQueryOperation) op);
         }
 
         throw new SqlExecutionException(
@@ -315,7 +320,7 @@ public class MaterializedTableManager {
         return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
     }
 
-    private void suspendContinuousRefreshJob(
+    private CatalogMaterializedTable suspendContinuousRefreshJob(
             OperationExecutor operationExecutor,
             OperationHandle handle,
             ObjectIdentifier tableIdentifier,
@@ -341,7 +346,7 @@ public class MaterializedTableManager {
                             refreshHandler.getJobId(),
                             savepointPath);
 
-            updateRefreshHandler(
+            return updateRefreshHandler(
                     operationExecutor,
                     handle,
                     tableIdentifier,
@@ -804,6 +809,164 @@ public class MaterializedTableManager {
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable oldMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (CatalogMaterializedTable.RefreshMode.FULL == 
oldMaterializedTable.getRefreshMode()) {
+            // directly apply the alter operation
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, op.getTableChanges(), 
op.getNewMaterializedTable());
+            return operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+        }
+
+        if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
+                == oldMaterializedTable.getRefreshStatus()) {
+            // 1. suspend the materialized table
+            CatalogMaterializedTable suspendMaterializedTable =
+                    suspendContinuousRefreshJob(
+                            operationExecutor, handle, tableIdentifier, 
oldMaterializedTable);
+
+            // 2. alter materialized table schema & query definition
+            CatalogMaterializedTable updatedMaterializedTable =
+                    op.getNewMaterializedTable()
+                            .copy(
+                                    
suspendMaterializedTable.getRefreshStatus(),
+                                    suspendMaterializedTable
+                                            .getRefreshHandlerDescription()
+                                            .orElse(null),
+                                    
suspendMaterializedTable.getSerializedRefreshHandler());
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, op.getTableChanges(), 
updatedMaterializedTable);
+            operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+
+            // 3. resume the materialized table
+            try {
+                executeContinuousRefreshJob(
+                        operationExecutor,
+                        handle,
+                        updatedMaterializedTable,
+                        tableIdentifier,
+                        Collections.emptyMap(),
+                        Optional.empty());
+            } catch (Exception e) {
+                // Roll back the changes to the materialized table and restore 
the continuous
+                // refresh job
+                LOG.warn(
+                        "Failed to start the continuous refresh job for 
materialized table {} using new query {}, rollback to origin query {}.",
+                        tableIdentifier,
+                        op.getNewMaterializedTable().getDefinitionQuery(),
+                        suspendMaterializedTable.getDefinitionQuery(),
+                        e);
+
+                AlterMaterializedTableChangeOperation rollbackChangeOperation =
+                        generateRollbackAlterMaterializedTableOperation(
+                                suspendMaterializedTable, 
alterMaterializedTableChangeOperation);
+                operationExecutor.callExecutableOperation(handle, 
rollbackChangeOperation);
+
+                ContinuousRefreshHandler continuousRefreshHandler =
+                        deserializeContinuousHandler(
+                                
suspendMaterializedTable.getSerializedRefreshHandler());
+                executeContinuousRefreshJob(
+                        operationExecutor,
+                        handle,
+                        suspendMaterializedTable,
+                        tableIdentifier,
+                        Collections.emptyMap(),
+                        continuousRefreshHandler.getRestorePath());
+
+                throw new SqlExecutionException(
+                        String.format(
+                                "Failed to start the continuous refresh job 
using new query %s when altering materialized table %s select query.",
+                                
op.getNewMaterializedTable().getDefinitionQuery(), tableIdentifier),
+                        e);
+            }
+        } else if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                == oldMaterializedTable.getRefreshStatus()) {
+            // alter schema & definition query & refresh handler (reset 
savepoint path of refresh
+            // handler)
+            List<MaterializedTableChange> tableChanges = new 
ArrayList<>(op.getTableChanges());
+            TableChange.ModifyRefreshHandler modifyRefreshHandler =
+                    genereateResetSavepointTableChange(
+                            
oldMaterializedTable.getSerializedRefreshHandler());
+            tableChanges.add(modifyRefreshHandler);
+
+            CatalogMaterializedTable updatedMaterializedTable =
+                    op.getNewMaterializedTable()
+                            .copy(
+                                    oldMaterializedTable.getRefreshStatus(),
+                                    
modifyRefreshHandler.getRefreshHandlerDesc(),
+                                    
modifyRefreshHandler.getRefreshHandlerBytes());
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, tableChanges, 
updatedMaterializedTable);
+
+            operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+        } else {
+            throw new SqlExecutionException(
+                    String.format(
+                            "Materialized table %s is being initialized and 
does not support alter operation.",
+                            tableIdentifier));
+        }
+
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private AlterMaterializedTableChangeOperation 
generateRollbackAlterMaterializedTableOperation(
+            CatalogMaterializedTable oldMaterializedTable,
+            AlterMaterializedTableChangeOperation op) {
+        List<MaterializedTableChange> tableChanges = op.getTableChanges();
+        List<MaterializedTableChange> rollbackChanges = new ArrayList<>();
+
+        for (TableChange tableChange : tableChanges) {
+            if (tableChange instanceof TableChange.AddColumn) {
+                TableChange.AddColumn addColumn = (TableChange.AddColumn) 
tableChange;
+                
rollbackChanges.add(TableChange.dropColumn(addColumn.getColumn().getName()));
+            } else if (tableChange instanceof 
TableChange.ModifyRefreshHandler) {
+                rollbackChanges.add(
+                        TableChange.modifyRefreshHandler(
+                                
oldMaterializedTable.getRefreshHandlerDescription().orElse(null),
+                                
oldMaterializedTable.getSerializedRefreshHandler()));
+            } else if (tableChange instanceof 
TableChange.ModifyDefinitionQuery) {
+                rollbackChanges.add(
+                        TableChange.modifyDefinitionQuery(
+                                oldMaterializedTable.getDefinitionQuery()));
+            } else {
+                throw new ValidationException(
+                        String.format(
+                                "Failed to generate rollback changes for 
materialized table '%s'. "
+                                        + "Unsupported table change detected: 
%s. ",
+                                op.getTableIdentifier(), tableChange));
+            }
+        }
+
+        return new AlterMaterializedTableChangeOperation(
+                op.getTableIdentifier(), rollbackChanges, 
oldMaterializedTable);
+    }
+
+    private TableChange.ModifyRefreshHandler 
genereateResetSavepointTableChange(
+            byte[] serializedContinuousHandler) {
+        ContinuousRefreshHandler continuousRefreshHandler =
+                deserializeContinuousHandler(serializedContinuousHandler);
+        ContinuousRefreshHandler resetedRefreshHandler =
+                new ContinuousRefreshHandler(
+                        continuousRefreshHandler.getExecutionTarget(),
+                        continuousRefreshHandler.getJobId());
+
+        return TableChange.modifyRefreshHandler(
+                resetedRefreshHandler.asSummaryString(),
+                serializeContinuousHandler(resetedRefreshHandler));
+    }
+
     private ResultFetcher callDropMaterializedTableOperation(
             OperationExecutor operationExecutor,
             OperationHandle handle,
@@ -1018,7 +1181,7 @@ public class MaterializedTableManager {
         return (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
     }
 
-    private void updateRefreshHandler(
+    private CatalogMaterializedTable updateRefreshHandler(
             OperationExecutor operationExecutor,
             OperationHandle operationHandle,
             ObjectIdentifier materializedTableIdentifier,
@@ -1029,7 +1192,7 @@ public class MaterializedTableManager {
         CatalogMaterializedTable updatedMaterializedTable =
                 catalogMaterializedTable.copy(
                         refreshStatus, refreshHandlerSummary, 
serializedRefreshHandler);
-        List<TableChange> tableChanges = new ArrayList<>();
+        List<MaterializedTableChange> tableChanges = new ArrayList<>();
         tableChanges.add(TableChange.modifyRefreshStatus(refreshStatus));
         tableChanges.add(
                 TableChange.modifyRefreshHandler(refreshHandlerSummary, 
serializedRefreshHandler));
@@ -1039,6 +1202,8 @@ public class MaterializedTableManager {
         // update RefreshHandler to Catalog
         operationExecutor.callExecutableOperation(
                 operationHandle, alterMaterializedTableChangeOperation);
+
+        return updatedMaterializedTable;
     }
 
     /** Generate insert statement for materialized table. */
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index b5f5fc3bd5f..49754e42aad 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -72,6 +72,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTING_INTERVAL;
 import static 
org.apache.flink.table.api.config.TableConfigOptions.RESOURCES_DOWNLOAD_DIR;
@@ -1027,6 +1028,348 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                                         fileSystemCatalogName,
                                         TEST_DEFAULT_DATABASE,
                                         "users_shops")));
+
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInFullMode() throws Exception {
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // Alter materialized table as query in full mode
+        String alterMaterializedTableAsQueryDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  COUNT(order_id) AS order_cnt,\n"
+                        + "  SUM(order_amount) AS order_amount_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, order_created_at AS 
ds, order_id, 1 as order_amount FROM my_source"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle alterMaterializedTableAsQueryHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableAsQueryDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableAsQueryHandle);
+
+        // verify the altered materialized table
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(
+                        Collections.singletonList(
+                                Column.physical("order_amount_sum", 
DataTypes.INT())));
+        assertThat(newTable.getDefinitionQuery())
+                .isEqualTo(
+                        String.format(
+                                "SELECT `tmp`.`user_id`, `tmp`.`shop_id`, 
`tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) 
AS `order_amount_sum`\n"
+                                        + "FROM (SELECT `my_source`.`user_id`, 
`my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`, 
`my_source`.`order_id`, 1 AS `order_amount`\n"
+                                        + "FROM `%s`.`test_db`.`my_source`) AS 
`tmp`\n"
+                                        + "GROUP BY ROW(`tmp`.`user_id`, 
`tmp`.`shop_id`, `tmp`.`ds`)",
+                                fileSystemCatalogName));
+        // the refresh handler in full mode should be the same as the old one
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isEqualTo(newTable.getSerializedRefreshHandler());
+        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws 
Exception {
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // Alter materialized table suspend
+        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE 
users_shops SUSPEND";
+        OperationHandle alterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+
+        // Alter materialized table as query in full mode
+        String alterMaterializedTableAsQueryDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  COUNT(order_id) AS order_cnt,\n"
+                        + "  SUM(order_amount) AS order_amount_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, order_created_at AS 
ds, order_id, 1 as order_amount FROM my_source"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle alterMaterializedTableAsQueryHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableAsQueryDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableAsQueryHandle);
+
+        // verify the altered materialized table
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(
+                        Collections.singletonList(
+                                Column.physical("order_amount_sum", 
DataTypes.INT())));
+        assertThat(newTable.getDefinitionQuery())
+                .isEqualTo(
+                        String.format(
+                                "SELECT `tmp`.`user_id`, `tmp`.`shop_id`, 
`tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) 
AS `order_amount_sum`\n"
+                                        + "FROM (SELECT `my_source`.`user_id`, 
`my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`, 
`my_source`.`order_id`, 1 AS `order_amount`\n"
+                                        + "FROM `%s`.`test_db`.`my_source`) AS 
`tmp`\n"
+                                        + "GROUP BY ROW(`tmp`.`user_id`, 
`tmp`.`shop_id`, `tmp`.`ds`)",
+                                fileSystemCatalogName));
+
+        // the refresh handler in full mode should be the same as the old one
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isEqualTo(newTable.getSerializedRefreshHandler());
+        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInContinuousMode(@TempDir Path 
temporaryPath)
+            throws Exception {
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops ("
+                        + " PRIMARY KEY (ds, user_id) not enforced)"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  coalesce(user_id, 0) as user_id,\n"
+                        + "  shop_id,\n"
+                        + "  coalesce(ds, '') as ds,\n"
+                        + "  SUM (payment_amount_cents) AS payed_buy_fee_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // verify background job is running
+        ContinuousRefreshHandler activeRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        oldTable.getSerializedRefreshHandler(), 
getClass().getClassLoader());
+        waitUntilAllTasksAreRunning(
+                restClusterClient, 
JobID.fromHexString(activeRefreshHandler.getJobId()));
+
+        // setup savepoint dir
+        String savepointDir = "file://" + temporaryPath.toAbsolutePath();
+        String setupSavepointDDL =
+                "SET 'execution.checkpointing.savepoint-dir' = '" + 
savepointDir + "'";
+        OperationHandle setupSavepointHandle =
+                service.executeStatement(sessionHandle, setupSavepointDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
setupSavepointHandle);
+
+        String alterTableDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  coalesce(user_id, 0) as user_id,\n"
+                        + "  shop_id,\n"
+                        + "  coalesce(ds, '') as ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle alterTableHandle =
+                service.executeStatement(sessionHandle, alterTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, alterTableHandle);
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(Collections.singletonList(Column.physical("pv", 
DataTypes.INT())));
+        assertThat(newTable.getResolvedSchema().getPrimaryKey())
+                .isEqualTo(oldTable.getResolvedSchema().getPrimaryKey());
+        assertThat(newTable.getResolvedSchema().getWatermarkSpecs())
+                .isEqualTo(oldTable.getResolvedSchema().getWatermarkSpecs());
+        assertThat(newTable.getDefinitionQuery())
+                .isEqualTo(
+                        String.format(
+                                "SELECT COALESCE(`tmp`.`user_id`, 0) AS 
`user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, 
SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n"
+                                        + "FROM (SELECT 
`datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
`DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, 
`datagenSource`.`payment_amount_cents`\n"
+                                        + "FROM 
`%s`.`test_db`.`datagenSource`) AS `tmp`\n"
+                                        + "GROUP BY ROW(`tmp`.`user_id`, 
`tmp`.`shop_id`, `tmp`.`ds`)",
+                                fileSystemCatalogName));
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isNotEqualTo(newTable.getSerializedRefreshHandler());
+
+        // verify the new continuous job is start without savepoint
+        ContinuousRefreshHandler newContinuousRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        newTable.getSerializedRefreshHandler(),
+                        Thread.currentThread().getContextClassLoader());
+        Optional<String> restorePath =
+                getJobRestoreSavepointPath(
+                        restClusterClient, 
newContinuousRefreshHandler.getJobId());
+        assertThat(restorePath).isEmpty();
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInContinuousModeWithSuspendStatus(
+            @TempDir Path temporaryPath) throws Exception {
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS payed_buy_fee_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // verify background job is running
+        ContinuousRefreshHandler activeRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        oldTable.getSerializedRefreshHandler(), 
getClass().getClassLoader());
+        waitUntilAllTasksAreRunning(
+                restClusterClient, 
JobID.fromHexString(activeRefreshHandler.getJobId()));
+
+        // setup savepoint dir
+        String savepointDir = "file://" + temporaryPath.toAbsolutePath();
+        String setupSavepointDDL =
+                "SET 'execution.checkpointing.savepoint-dir' = '" + 
savepointDir + "'";
+        OperationHandle setupSavepointHandle =
+                service.executeStatement(sessionHandle, setupSavepointDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
setupSavepointHandle);
+
+        // suspend materialized table
+        String suspendTableDDL = "ALTER MATERIALIZED TABLE users_shops 
SUSPEND";
+        OperationHandle suspendTableHandle =
+                service.executeStatement(sessionHandle, suspendTableDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, suspendTableHandle);
+
+        String alterTableDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle alterTableHandle =
+                service.executeStatement(sessionHandle, alterTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, alterTableHandle);
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(Collections.singletonList(Column.physical("pv", 
DataTypes.INT())));
+        assertThat(newTable.getDefinitionQuery())
+                .isEqualTo(
+                        String.format(
+                                "SELECT `tmp`.`user_id`, `tmp`.`shop_id`, 
`tmp`.`ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS 
`pv`\n"
+                                        + "FROM (SELECT 
`datagenSource`.`user_id`, `datagenSource`.`shop_id`, 
`DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, 
`datagenSource`.`payment_amount_cents`\n"
+                                        + "FROM 
`%s`.`test_db`.`datagenSource`) AS `tmp`\n"
+                                        + "GROUP BY ROW(`tmp`.`user_id`, 
`tmp`.`shop_id`, `tmp`.`ds`)",
+                                fileSystemCatalogName));
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isEqualTo(newTable.getSerializedRefreshHandler());
+        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
+
+        // verify the restore path is empty
+        ContinuousRefreshHandler newContinuousRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        newTable.getSerializedRefreshHandler(),
+                        Thread.currentThread().getContextClassLoader());
+        assertThat(newContinuousRefreshHandler.getRestorePath()).isEmpty();
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
     }
 
     @Test
@@ -1574,6 +1917,12 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                         fileSystemCatalogName, TEST_DEFAULT_DATABASE, 
"my_materialized_table"));
     }
 
+    private List<Column> getAddedColumns(ResolvedSchema newSchema, 
ResolvedSchema oldSchema) {
+        return newSchema.getColumns().stream()
+                .filter(column -> !oldSchema.getColumns().contains(column))
+                .collect(Collectors.toList());
+    }
+
     private int getPartitionSize(List<Row> data, String partition) {
         return (int)
                 data.stream().filter(row -> 
row.getField(3).toString().equals(partition)).count();


Reply via email to