lsyldliu commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1597326389


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
                     "Submit continuous refresh job for materialized table {} 
occur exception.",
                     materializedTableIdentifier,
                     e);
-            throw new TableException(
+            throw new SqlExecutionException(
                     String.format(
                             "Submit continuous refresh job for materialized 
table %s occur exception.",
                             materializedTableIdentifier),
                     e);
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (MATERIALIZED_TABLE != table.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+        // Set job name, runtime mode
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT OVERWRITE %s SELECT * FROM (%s)",
+                                materializedTableIdentifier,
+                                materializedTable.getDefinitionQuery()));
+
+        if (!partitionSpec.isEmpty()) {
+            insertStatement.append(" WHERE ");

Review Comment:
   I think the WHERE clause could be better formatted on a new line



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
                     "Submit continuous refresh job for materialized table {} 
occur exception.",
                     materializedTableIdentifier,
                     e);
-            throw new TableException(
+            throw new SqlExecutionException(
                     String.format(
                             "Submit continuous refresh job for materialized 
table %s occur exception.",
                             materializedTableIdentifier),
                     e);
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (MATERIALIZED_TABLE != table.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+        // Set job name, runtime mode
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT OVERWRITE %s SELECT * FROM (%s)",

Review Comment:
   It would be nice to put the SELECT statement as a newline.
   
   Moreover, can you extract this generate insert statement logic as util 
method, which can help us to test it seperately.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
                     "Submit continuous refresh job for materialized table {} 
occur exception.",
                     materializedTableIdentifier,
                     e);
-            throw new TableException(
+            throw new SqlExecutionException(
                     String.format(
                             "Submit continuous refresh job for materialized 
table %s occur exception.",
                             materializedTableIdentifier),
                     e);
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (MATERIALIZED_TABLE != table.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+        // Set job name, runtime mode
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT OVERWRITE %s SELECT * FROM (%s)",
+                                materializedTableIdentifier,
+                                materializedTable.getDefinitionQuery()));
+
+        if (!partitionSpec.isEmpty()) {
+            insertStatement.append(" WHERE ");
+            insertStatement.append(
+                    partitionSpec.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            String.format(
+                                                    "%s = '%s'", 
entry.getKey(), entry.getValue()))
+                            .reduce((s1, s2) -> s1 + " AND " + s2)
+                            .get());
+        }
+
+        try {
+            LOG.debug(
+                    "Begin to manually refreshing the materialization table 
{}, statement: {}",
+                    materializedTableIdentifier,
+                    insertStatement);
+            return operationExecutor.executeStatement(
+                    handle, customConfig, insertStatement.toString());
+        } catch (Exception e) {
+            // log and throw exception
+            LOG.error(
+                    "Manually refreshing the materialization table {} occur 
exception.",
+                    materializedTableIdentifier,
+                    e);
+            throw new SqlExecutionException(
+                    String.format(
+                            "Manually refreshing the materialization table %s 
occur exception.",
+                            materializedTableIdentifier),
+                    e);
+        }
+    }
+
+    private static void validatePartitionSpec(
+            Map<String, String> partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+        ResolvedSchema schema = table.getResolvedSchema();
+        Set<String> allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+        Set<String> unknownPartitionKeys = new HashSet<>();
+        Set<String> nonStringPartitionKeys = new HashSet<>();
+
+        for (String partitionKey : partitionSpec.keySet()) {
+            if (!schema.getColumn(partitionKey).isPresent()) {
+                unknownPartitionKeys.add(partitionKey);
+                continue;
+            }
+
+            if (!schema.getColumn(partitionKey)
+                    .get()
+                    .getDataType()
+                    .getLogicalType()
+                    .getTypeRoot()
+                    .getFamilies()
+                    .contains(LogicalTypeFamily.CHARACTER_STRING)) {
+                nonStringPartitionKeys.add(partitionKey);
+            }
+        }
+
+        if (!unknownPartitionKeys.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "The partition spec contains unknown partition 
keys: [%s]. All known partition keys are: [%s].",
+                            unknownPartitionKeys.stream()
+                                    .collect(Collectors.joining("', '", "'", 
"'")),
+                            allPartitionKeys.stream()
+                                    .collect(Collectors.joining("', '", "'", 
"'"))));
+        }
+
+        if (!nonStringPartitionKeys.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Currently, specifying non-char or non-string type 
partition fields"

Review Comment:
   What about using the following exception msg:
   "Currently, manual refreshing materialized table only supports specifying 
char and string type partition keys. All specific partition keys with 
unsupported types are:\n\n %s."



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
                     "Submit continuous refresh job for materialized table {} 
occur exception.",
                     materializedTableIdentifier,
                     e);
-            throw new TableException(
+            throw new SqlExecutionException(
                     String.format(
                             "Submit continuous refresh job for materialized 
table %s occur exception.",
                             materializedTableIdentifier),
                     e);
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (MATERIALIZED_TABLE != table.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+        // Set job name, runtime mode
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT OVERWRITE %s SELECT * FROM (%s)",
+                                materializedTableIdentifier,
+                                materializedTable.getDefinitionQuery()));
+
+        if (!partitionSpec.isEmpty()) {
+            insertStatement.append(" WHERE ");
+            insertStatement.append(
+                    partitionSpec.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            String.format(
+                                                    "%s = '%s'", 
entry.getKey(), entry.getValue()))
+                            .reduce((s1, s2) -> s1 + " AND " + s2)
+                            .get());
+        }
+
+        try {
+            LOG.debug(
+                    "Begin to manually refreshing the materialization table 
{}, statement: {}",
+                    materializedTableIdentifier,
+                    insertStatement);
+            return operationExecutor.executeStatement(
+                    handle, customConfig, insertStatement.toString());
+        } catch (Exception e) {
+            // log and throw exception
+            LOG.error(
+                    "Manually refreshing the materialization table {} occur 
exception.",
+                    materializedTableIdentifier,
+                    e);
+            throw new SqlExecutionException(
+                    String.format(
+                            "Manually refreshing the materialization table %s 
occur exception.",
+                            materializedTableIdentifier),
+                    e);
+        }
+    }
+
+    private static void validatePartitionSpec(
+            Map<String, String> partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+        ResolvedSchema schema = table.getResolvedSchema();
+        Set<String> allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+        Set<String> unknownPartitionKeys = new HashSet<>();
+        Set<String> nonStringPartitionKeys = new HashSet<>();
+
+        for (String partitionKey : partitionSpec.keySet()) {
+            if (!schema.getColumn(partitionKey).isPresent()) {
+                unknownPartitionKeys.add(partitionKey);
+                continue;
+            }
+
+            if (!schema.getColumn(partitionKey)
+                    .get()
+                    .getDataType()
+                    .getLogicalType()
+                    .getTypeRoot()
+                    .getFamilies()
+                    .contains(LogicalTypeFamily.CHARACTER_STRING)) {
+                nonStringPartitionKeys.add(partitionKey);
+            }
+        }
+
+        if (!unknownPartitionKeys.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "The partition spec contains unknown partition 
keys: [%s]. All known partition keys are: [%s].",
+                            unknownPartitionKeys.stream()
+                                    .collect(Collectors.joining("', '", "'", 
"'")),
+                            allPartitionKeys.stream()
+                                    .collect(Collectors.joining("', '", "'", 
"'"))));
+        }
+
+        if (!nonStringPartitionKeys.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Currently, specifying non-char or non-string type 
partition fields"
+                                    + " to refresh materialized tables is not 
supported."
+                                    + " All specific partition keys with 
unsupported types are: [%s].",

Review Comment:
   ditto



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
                     "Submit continuous refresh job for materialized table {} 
occur exception.",
                     materializedTableIdentifier,
                     e);
-            throw new TableException(
+            throw new SqlExecutionException(
                     String.format(
                             "Submit continuous refresh job for materialized 
table %s occur exception.",
                             materializedTableIdentifier),
                     e);
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (MATERIALIZED_TABLE != table.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+        // Set job name, runtime mode
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT OVERWRITE %s SELECT * FROM (%s)",
+                                materializedTableIdentifier,
+                                materializedTable.getDefinitionQuery()));
+
+        if (!partitionSpec.isEmpty()) {
+            insertStatement.append(" WHERE ");
+            insertStatement.append(
+                    partitionSpec.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            String.format(
+                                                    "%s = '%s'", 
entry.getKey(), entry.getValue()))
+                            .reduce((s1, s2) -> s1 + " AND " + s2)
+                            .get());
+        }
+
+        try {
+            LOG.debug(
+                    "Begin to manually refreshing the materialization table 
{}, statement: {}",
+                    materializedTableIdentifier,
+                    insertStatement);
+            return operationExecutor.executeStatement(
+                    handle, customConfig, insertStatement.toString());
+        } catch (Exception e) {
+            // log and throw exception
+            LOG.error(
+                    "Manually refreshing the materialization table {} occur 
exception.",
+                    materializedTableIdentifier,
+                    e);
+            throw new SqlExecutionException(
+                    String.format(
+                            "Manually refreshing the materialization table %s 
occur exception.",
+                            materializedTableIdentifier),
+                    e);
+        }
+    }
+
+    private static void validatePartitionSpec(
+            Map<String, String> partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+        ResolvedSchema schema = table.getResolvedSchema();
+        Set<String> allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+        Set<String> unknownPartitionKeys = new HashSet<>();
+        Set<String> nonStringPartitionKeys = new HashSet<>();
+
+        for (String partitionKey : partitionSpec.keySet()) {
+            if (!schema.getColumn(partitionKey).isPresent()) {
+                unknownPartitionKeys.add(partitionKey);
+                continue;
+            }
+
+            if (!schema.getColumn(partitionKey)
+                    .get()
+                    .getDataType()
+                    .getLogicalType()
+                    .getTypeRoot()
+                    .getFamilies()
+                    .contains(LogicalTypeFamily.CHARACTER_STRING)) {
+                nonStringPartitionKeys.add(partitionKey);
+            }
+        }
+
+        if (!unknownPartitionKeys.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "The partition spec contains unknown partition 
keys: [%s]. All known partition keys are: [%s].",

Review Comment:
   It would be better to add "\n\n" between ":" and "[%s]", See FactoryUtil for 
more detail.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode(
                     "Submit continuous refresh job for materialized table {} 
occur exception.",
                     materializedTableIdentifier,
                     e);
-            throw new TableException(
+            throw new SqlExecutionException(
                     String.format(
                             "Submit continuous refresh job for materialized 
table %s occur exception.",
                             materializedTableIdentifier),
                     e);
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (MATERIALIZED_TABLE != table.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        validatePartitionSpec(partitionSpec, 
(ResolvedCatalogMaterializedTable) table);
+
+        // Set job name, runtime mode
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT OVERWRITE %s SELECT * FROM (%s)",
+                                materializedTableIdentifier,
+                                materializedTable.getDefinitionQuery()));
+
+        if (!partitionSpec.isEmpty()) {
+            insertStatement.append(" WHERE ");
+            insertStatement.append(
+                    partitionSpec.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            String.format(
+                                                    "%s = '%s'", 
entry.getKey(), entry.getValue()))
+                            .reduce((s1, s2) -> s1 + " AND " + s2)
+                            .get());
+        }
+
+        try {
+            LOG.debug(
+                    "Begin to manually refreshing the materialization table 
{}, statement: {}",
+                    materializedTableIdentifier,
+                    insertStatement);
+            return operationExecutor.executeStatement(
+                    handle, customConfig, insertStatement.toString());
+        } catch (Exception e) {
+            // log and throw exception
+            LOG.error(
+                    "Manually refreshing the materialization table {} occur 
exception.",
+                    materializedTableIdentifier,
+                    e);
+            throw new SqlExecutionException(
+                    String.format(
+                            "Manually refreshing the materialization table %s 
occur exception.",
+                            materializedTableIdentifier),
+                    e);
+        }
+    }
+
+    private static void validatePartitionSpec(
+            Map<String, String> partitionSpec, 
ResolvedCatalogMaterializedTable table) {
+        ResolvedSchema schema = table.getResolvedSchema();
+        Set<String> allPartitionKeys = new HashSet<>(table.getPartitionKeys());
+
+        Set<String> unknownPartitionKeys = new HashSet<>();
+        Set<String> nonStringPartitionKeys = new HashSet<>();
+
+        for (String partitionKey : partitionSpec.keySet()) {
+            if (!schema.getColumn(partitionKey).isPresent()) {
+                unknownPartitionKeys.add(partitionKey);
+                continue;
+            }
+
+            if (!schema.getColumn(partitionKey)
+                    .get()
+                    .getDataType()
+                    .getLogicalType()
+                    .getTypeRoot()
+                    .getFamilies()
+                    .contains(LogicalTypeFamily.CHARACTER_STRING)) {
+                nonStringPartitionKeys.add(partitionKey);
+            }
+        }
+
+        if (!unknownPartitionKeys.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "The partition spec contains unknown partition 
keys: [%s]. All known partition keys are: [%s].",
+                            unknownPartitionKeys.stream()

Review Comment:
   I think we can refer to the following logic, util `stringifyOption` method.
   ```
   options.entrySet().stream()
                                           .map(
                                                   optionEntry ->
                                                           stringifyOption(
                                                                   
optionEntry.getKey(),
                                                                   
optionEntry.getValue()))
                                           .sorted()
                                           .collect(Collectors.joining("\n")))
   ```



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -236,6 +252,213 @@ void testCreateMaterializedTableInFullMode() {
                         "Only support create materialized table in continuous 
refresh mode currently.");
     }
 
+    @Test
+    void testAlterMaterializedTableRefresh() throws Exception {
+        long timeout = Duration.ofSeconds(20).toMillis();
+        long pause = Duration.ofSeconds(2).toMillis();
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        SessionHandle sessionHandle = initializeSession();
+
+        List<Row> data = new ArrayList<>();
+        data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
+        data.add(Row.of(2L, 2L, 2L, "2024-01-02"));
+        data.add(Row.of(3L, 3L, 3L, "2024-01-02"));
+        String dataId = TestValuesTableFactory.registerData(data);
+
+        String sourceDdl =
+                String.format(
+                        "CREATE TABLE my_source (\n"
+                                + "  order_id BIGINT,\n"
+                                + "  user_id BIGINT,\n"
+                                + "  shop_id BIGINT,\n"
+                                + "  order_created_at STRING\n"
+                                + ")\n"
+                                + "WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = 'true',\n"
+                                + "  'data-id' = '%s'\n"
+                                + ")",
+                        dataId);
+        service.executeStatement(sessionHandle, sourceDdl, -1, new 
Configuration());
+
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE my_materialized_table"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '2' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  COUNT(order_id) AS order_cnt\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, order_created_at AS 
ds, order_id FROM my_source"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        // verify data exists in materialized table
+        CommonTestUtils.waitUtil(
+                () ->
+                        fetchTableData(sessionHandle, "SELECT * FROM 
my_materialized_table").size()
+                                == data.size(),
+                Duration.ofMillis(timeout),
+                Duration.ofMillis(pause),
+                "Failed to verify the data in materialized table.");
+        assertThat(
+                        fetchTableData(
+                                        sessionHandle,
+                                        "SELECT * FROM my_materialized_table 
where ds = '2024-01-02'")
+                                .size())
+                .isEqualTo(2);
+
+        // remove the last element
+        data.remove(2);
+
+        long currentTime = System.currentTimeMillis();
+        String alterStatement =
+                "ALTER MATERIALIZED TABLE my_materialized_table REFRESH 
PARTITION (ds = '2024-01-02')";
+        OperationHandle alterHandle =
+                service.executeStatement(sessionHandle, alterStatement, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, alterHandle);
+        List<RowData> result = fetchAllResults(service, sessionHandle, 
alterHandle);
+        assertThat(result.size()).isEqualTo(1);
+        String jobId = result.get(0).getString(0).toString();
+
+        MiniCluster miniCluster = MINI_CLUSTER.getMiniCluster();
+
+        // 1. verify a new job is created
+        Optional<JobStatusMessage> job =
+                miniCluster.listJobs().get(timeout, 
TimeUnit.MILLISECONDS).stream()
+                        .filter(j -> j.getJobId().toString().equals(jobId))
+                        .findFirst();
+        assertThat(job).isPresent();
+        assertThat(job.get().getStartTime()).isGreaterThan(currentTime);
+
+        // 2. verify the new job is a batch job
+        ArchivedExecutionGraph executionGraph =

Review Comment:
   We can get the JobDetailsInfo using RestClusterClient, then we can use 
ObjectMapper the jobType, you can refer to PR: 
https://github.com/apache/flink/pull/24765/files,  
MaterializedTableStatementITCase(L244). This may be one possible solution. I 
just think using ArchivedExecutionGraph is too lower-level API.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -236,6 +252,213 @@ void testCreateMaterializedTableInFullMode() {
                         "Only support create materialized table in continuous 
refresh mode currently.");
     }
 
+    @Test
+    void testAlterMaterializedTableRefresh() throws Exception {
+        long timeout = Duration.ofSeconds(20).toMillis();
+        long pause = Duration.ofSeconds(2).toMillis();
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        SessionHandle sessionHandle = initializeSession();
+
+        List<Row> data = new ArrayList<>();
+        data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
+        data.add(Row.of(2L, 2L, 2L, "2024-01-02"));
+        data.add(Row.of(3L, 3L, 3L, "2024-01-02"));
+        String dataId = TestValuesTableFactory.registerData(data);
+
+        String sourceDdl =
+                String.format(
+                        "CREATE TABLE my_source (\n"
+                                + "  order_id BIGINT,\n"
+                                + "  user_id BIGINT,\n"
+                                + "  shop_id BIGINT,\n"
+                                + "  order_created_at STRING\n"
+                                + ")\n"
+                                + "WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = 'true',\n"
+                                + "  'data-id' = '%s'\n"
+                                + ")",
+                        dataId);
+        service.executeStatement(sessionHandle, sourceDdl, -1, new 
Configuration());

Review Comment:
   we should wait until this statement is finished, then we can create the 
materialized table.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -236,6 +252,213 @@ void testCreateMaterializedTableInFullMode() {
                         "Only support create materialized table in continuous 
refresh mode currently.");
     }
 
+    @Test
+    void testAlterMaterializedTableRefresh() throws Exception {
+        long timeout = Duration.ofSeconds(20).toMillis();
+        long pause = Duration.ofSeconds(2).toMillis();
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        SessionHandle sessionHandle = initializeSession();
+
+        List<Row> data = new ArrayList<>();
+        data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
+        data.add(Row.of(2L, 2L, 2L, "2024-01-02"));
+        data.add(Row.of(3L, 3L, 3L, "2024-01-02"));
+        String dataId = TestValuesTableFactory.registerData(data);
+
+        String sourceDdl =
+                String.format(
+                        "CREATE TABLE my_source (\n"
+                                + "  order_id BIGINT,\n"
+                                + "  user_id BIGINT,\n"
+                                + "  shop_id BIGINT,\n"
+                                + "  order_created_at STRING\n"
+                                + ")\n"
+                                + "WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = 'true',\n"
+                                + "  'data-id' = '%s'\n"
+                                + ")",
+                        dataId);
+        service.executeStatement(sessionHandle, sourceDdl, -1, new 
Configuration());
+
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE my_materialized_table"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '2' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  COUNT(order_id) AS order_cnt\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, order_created_at AS 
ds, order_id FROM my_source"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        // verify data exists in materialized table
+        CommonTestUtils.waitUtil(
+                () ->
+                        fetchTableData(sessionHandle, "SELECT * FROM 
my_materialized_table").size()
+                                == data.size(),
+                Duration.ofMillis(timeout),
+                Duration.ofMillis(pause),
+                "Failed to verify the data in materialized table.");
+        assertThat(
+                        fetchTableData(
+                                        sessionHandle,
+                                        "SELECT * FROM my_materialized_table 
where ds = '2024-01-02'")
+                                .size())
+                .isEqualTo(2);
+
+        // remove the last element
+        data.remove(2);
+
+        long currentTime = System.currentTimeMillis();
+        String alterStatement =
+                "ALTER MATERIALIZED TABLE my_materialized_table REFRESH 
PARTITION (ds = '2024-01-02')";
+        OperationHandle alterHandle =
+                service.executeStatement(sessionHandle, alterStatement, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, alterHandle);
+        List<RowData> result = fetchAllResults(service, sessionHandle, 
alterHandle);
+        assertThat(result.size()).isEqualTo(1);
+        String jobId = result.get(0).getString(0).toString();
+
+        MiniCluster miniCluster = MINI_CLUSTER.getMiniCluster();

Review Comment:
   Why not use `@InjectClusterClient RestClusterClient<?> restClusterClient` 
directly, I think using MiniCluster is two heavy here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to