lsyldliu commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597326389
########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); - throw new TableException( + throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (MATERIALIZED_TABLE != table.getTableKind()) { + throw new ValidationException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + + // Set job name, runtime mode + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT OVERWRITE %s SELECT * FROM (%s)", + materializedTableIdentifier, + materializedTable.getDefinitionQuery())); + + if (!partitionSpec.isEmpty()) { + insertStatement.append(" WHERE "); Review Comment: I think the WHERE clause could be better formatted on a new line ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); - throw new TableException( + throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (MATERIALIZED_TABLE != table.getTableKind()) { + throw new ValidationException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + + // Set job name, runtime mode + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT OVERWRITE %s SELECT * FROM (%s)", Review Comment: It would be nice to put the SELECT statement as a newline. Moreover, can you extract this generate insert statement logic as util method, which can help us to test it seperately. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); - throw new TableException( + throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (MATERIALIZED_TABLE != table.getTableKind()) { + throw new ValidationException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + + // Set job name, runtime mode + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT OVERWRITE %s SELECT * FROM (%s)", + materializedTableIdentifier, + materializedTable.getDefinitionQuery())); + + if (!partitionSpec.isEmpty()) { + insertStatement.append(" WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .get()); + } + + try { + LOG.debug( + "Begin to manually refreshing the materialization table {}, statement: {}", + materializedTableIdentifier, + insertStatement); + return operationExecutor.executeStatement( + handle, customConfig, insertStatement.toString()); + } catch (Exception e) { + // log and throw exception + LOG.error( + "Manually refreshing the materialization table {} occur exception.", + materializedTableIdentifier, + e); + throw new SqlExecutionException( + String.format( + "Manually refreshing the materialization table %s occur exception.", + materializedTableIdentifier), + e); + } + } + + private static void validatePartitionSpec( + Map<String, String> partitionSpec, ResolvedCatalogMaterializedTable table) { + ResolvedSchema schema = table.getResolvedSchema(); + Set<String> allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + + Set<String> unknownPartitionKeys = new HashSet<>(); + Set<String> nonStringPartitionKeys = new HashSet<>(); + + for (String partitionKey : partitionSpec.keySet()) { + if (!schema.getColumn(partitionKey).isPresent()) { + unknownPartitionKeys.add(partitionKey); + continue; + } + + if (!schema.getColumn(partitionKey) + .get() + .getDataType() + .getLogicalType() + .getTypeRoot() + .getFamilies() + .contains(LogicalTypeFamily.CHARACTER_STRING)) { + nonStringPartitionKeys.add(partitionKey); + } + } + + if (!unknownPartitionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "The partition spec contains unknown partition keys: [%s]. All known partition keys are: [%s].", + unknownPartitionKeys.stream() + .collect(Collectors.joining("', '", "'", "'")), + allPartitionKeys.stream() + .collect(Collectors.joining("', '", "'", "'")))); + } + + if (!nonStringPartitionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "Currently, specifying non-char or non-string type partition fields" Review Comment: What about using the following exception msg: "Currently, manual refreshing materialized table only supports specifying char and string type partition keys. All specific partition keys with unsupported types are:\n\n %s." ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); - throw new TableException( + throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (MATERIALIZED_TABLE != table.getTableKind()) { + throw new ValidationException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + + // Set job name, runtime mode + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT OVERWRITE %s SELECT * FROM (%s)", + materializedTableIdentifier, + materializedTable.getDefinitionQuery())); + + if (!partitionSpec.isEmpty()) { + insertStatement.append(" WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .get()); + } + + try { + LOG.debug( + "Begin to manually refreshing the materialization table {}, statement: {}", + materializedTableIdentifier, + insertStatement); + return operationExecutor.executeStatement( + handle, customConfig, insertStatement.toString()); + } catch (Exception e) { + // log and throw exception + LOG.error( + "Manually refreshing the materialization table {} occur exception.", + materializedTableIdentifier, + e); + throw new SqlExecutionException( + String.format( + "Manually refreshing the materialization table %s occur exception.", + materializedTableIdentifier), + e); + } + } + + private static void validatePartitionSpec( + Map<String, String> partitionSpec, ResolvedCatalogMaterializedTable table) { + ResolvedSchema schema = table.getResolvedSchema(); + Set<String> allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + + Set<String> unknownPartitionKeys = new HashSet<>(); + Set<String> nonStringPartitionKeys = new HashSet<>(); + + for (String partitionKey : partitionSpec.keySet()) { + if (!schema.getColumn(partitionKey).isPresent()) { + unknownPartitionKeys.add(partitionKey); + continue; + } + + if (!schema.getColumn(partitionKey) + .get() + .getDataType() + .getLogicalType() + .getTypeRoot() + .getFamilies() + .contains(LogicalTypeFamily.CHARACTER_STRING)) { + nonStringPartitionKeys.add(partitionKey); + } + } + + if (!unknownPartitionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "The partition spec contains unknown partition keys: [%s]. All known partition keys are: [%s].", + unknownPartitionKeys.stream() + .collect(Collectors.joining("', '", "'", "'")), + allPartitionKeys.stream() + .collect(Collectors.joining("', '", "'", "'")))); + } + + if (!nonStringPartitionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "Currently, specifying non-char or non-string type partition fields" + + " to refresh materialized tables is not supported." + + " All specific partition keys with unsupported types are: [%s].", Review Comment: ditto ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); - throw new TableException( + throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (MATERIALIZED_TABLE != table.getTableKind()) { + throw new ValidationException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + + // Set job name, runtime mode + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT OVERWRITE %s SELECT * FROM (%s)", + materializedTableIdentifier, + materializedTable.getDefinitionQuery())); + + if (!partitionSpec.isEmpty()) { + insertStatement.append(" WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .get()); + } + + try { + LOG.debug( + "Begin to manually refreshing the materialization table {}, statement: {}", + materializedTableIdentifier, + insertStatement); + return operationExecutor.executeStatement( + handle, customConfig, insertStatement.toString()); + } catch (Exception e) { + // log and throw exception + LOG.error( + "Manually refreshing the materialization table {} occur exception.", + materializedTableIdentifier, + e); + throw new SqlExecutionException( + String.format( + "Manually refreshing the materialization table %s occur exception.", + materializedTableIdentifier), + e); + } + } + + private static void validatePartitionSpec( + Map<String, String> partitionSpec, ResolvedCatalogMaterializedTable table) { + ResolvedSchema schema = table.getResolvedSchema(); + Set<String> allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + + Set<String> unknownPartitionKeys = new HashSet<>(); + Set<String> nonStringPartitionKeys = new HashSet<>(); + + for (String partitionKey : partitionSpec.keySet()) { + if (!schema.getColumn(partitionKey).isPresent()) { + unknownPartitionKeys.add(partitionKey); + continue; + } + + if (!schema.getColumn(partitionKey) + .get() + .getDataType() + .getLogicalType() + .getTypeRoot() + .getFamilies() + .contains(LogicalTypeFamily.CHARACTER_STRING)) { + nonStringPartitionKeys.add(partitionKey); + } + } + + if (!unknownPartitionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "The partition spec contains unknown partition keys: [%s]. All known partition keys are: [%s].", Review Comment: It would be better to add "\n\n" between ":" and "[%s]", See FactoryUtil for more detail. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); - throw new TableException( + throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (MATERIALIZED_TABLE != table.getTableKind()) { + throw new ValidationException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + + // Set job name, runtime mode + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT OVERWRITE %s SELECT * FROM (%s)", + materializedTableIdentifier, + materializedTable.getDefinitionQuery())); + + if (!partitionSpec.isEmpty()) { + insertStatement.append(" WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .get()); + } + + try { + LOG.debug( + "Begin to manually refreshing the materialization table {}, statement: {}", + materializedTableIdentifier, + insertStatement); + return operationExecutor.executeStatement( + handle, customConfig, insertStatement.toString()); + } catch (Exception e) { + // log and throw exception + LOG.error( + "Manually refreshing the materialization table {} occur exception.", + materializedTableIdentifier, + e); + throw new SqlExecutionException( + String.format( + "Manually refreshing the materialization table %s occur exception.", + materializedTableIdentifier), + e); + } + } + + private static void validatePartitionSpec( + Map<String, String> partitionSpec, ResolvedCatalogMaterializedTable table) { + ResolvedSchema schema = table.getResolvedSchema(); + Set<String> allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + + Set<String> unknownPartitionKeys = new HashSet<>(); + Set<String> nonStringPartitionKeys = new HashSet<>(); + + for (String partitionKey : partitionSpec.keySet()) { + if (!schema.getColumn(partitionKey).isPresent()) { + unknownPartitionKeys.add(partitionKey); + continue; + } + + if (!schema.getColumn(partitionKey) + .get() + .getDataType() + .getLogicalType() + .getTypeRoot() + .getFamilies() + .contains(LogicalTypeFamily.CHARACTER_STRING)) { + nonStringPartitionKeys.add(partitionKey); + } + } + + if (!unknownPartitionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "The partition spec contains unknown partition keys: [%s]. All known partition keys are: [%s].", + unknownPartitionKeys.stream() Review Comment: I think we can refer to the following logic, util `stringifyOption` method. ``` options.entrySet().stream() .map( optionEntry -> stringifyOption( optionEntry.getKey(), optionEntry.getValue())) .sorted() .collect(Collectors.joining("\n"))) ``` ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -236,6 +252,213 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } + @Test + void testAlterMaterializedTableRefresh() throws Exception { + long timeout = Duration.ofSeconds(20).toMillis(); + long pause = Duration.ofSeconds(2).toMillis(); + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + SessionHandle sessionHandle = initializeSession(); + + List<Row> data = new ArrayList<>(); + data.add(Row.of(1L, 1L, 1L, "2024-01-01")); + data.add(Row.of(2L, 2L, 2L, "2024-01-02")); + data.add(Row.of(3L, 3L, 3L, "2024-01-02")); + String dataId = TestValuesTableFactory.registerData(data); + + String sourceDdl = + String.format( + "CREATE TABLE my_source (\n" + + " order_id BIGINT,\n" + + " user_id BIGINT,\n" + + " shop_id BIGINT,\n" + + " order_created_at STRING\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true',\n" + + " 'data-id' = '%s'\n" + + ")", + dataId); + service.executeStatement(sessionHandle, sourceDdl, -1, new Configuration()); + + String materializedTableDDL = + "CREATE MATERIALIZED TABLE my_materialized_table" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '2' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // verify data exists in materialized table + CommonTestUtils.waitUtil( + () -> + fetchTableData(sessionHandle, "SELECT * FROM my_materialized_table").size() + == data.size(), + Duration.ofMillis(timeout), + Duration.ofMillis(pause), + "Failed to verify the data in materialized table."); + assertThat( + fetchTableData( + sessionHandle, + "SELECT * FROM my_materialized_table where ds = '2024-01-02'") + .size()) + .isEqualTo(2); + + // remove the last element + data.remove(2); + + long currentTime = System.currentTimeMillis(); + String alterStatement = + "ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds = '2024-01-02')"; + OperationHandle alterHandle = + service.executeStatement(sessionHandle, alterStatement, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterHandle); + List<RowData> result = fetchAllResults(service, sessionHandle, alterHandle); + assertThat(result.size()).isEqualTo(1); + String jobId = result.get(0).getString(0).toString(); + + MiniCluster miniCluster = MINI_CLUSTER.getMiniCluster(); + + // 1. verify a new job is created + Optional<JobStatusMessage> job = + miniCluster.listJobs().get(timeout, TimeUnit.MILLISECONDS).stream() + .filter(j -> j.getJobId().toString().equals(jobId)) + .findFirst(); + assertThat(job).isPresent(); + assertThat(job.get().getStartTime()).isGreaterThan(currentTime); + + // 2. verify the new job is a batch job + ArchivedExecutionGraph executionGraph = Review Comment: We can get the JobDetailsInfo using RestClusterClient, then we can use ObjectMapper the jobType, you can refer to PR: https://github.com/apache/flink/pull/24765/files, MaterializedTableStatementITCase(L244). This may be one possible solution. I just think using ArchivedExecutionGraph is too lower-level API. ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -236,6 +252,213 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } + @Test + void testAlterMaterializedTableRefresh() throws Exception { + long timeout = Duration.ofSeconds(20).toMillis(); + long pause = Duration.ofSeconds(2).toMillis(); + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + SessionHandle sessionHandle = initializeSession(); + + List<Row> data = new ArrayList<>(); + data.add(Row.of(1L, 1L, 1L, "2024-01-01")); + data.add(Row.of(2L, 2L, 2L, "2024-01-02")); + data.add(Row.of(3L, 3L, 3L, "2024-01-02")); + String dataId = TestValuesTableFactory.registerData(data); + + String sourceDdl = + String.format( + "CREATE TABLE my_source (\n" + + " order_id BIGINT,\n" + + " user_id BIGINT,\n" + + " shop_id BIGINT,\n" + + " order_created_at STRING\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true',\n" + + " 'data-id' = '%s'\n" + + ")", + dataId); + service.executeStatement(sessionHandle, sourceDdl, -1, new Configuration()); Review Comment: we should wait until this statement is finished, then we can create the materialized table. ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -236,6 +252,213 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } + @Test + void testAlterMaterializedTableRefresh() throws Exception { + long timeout = Duration.ofSeconds(20).toMillis(); + long pause = Duration.ofSeconds(2).toMillis(); + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + SessionHandle sessionHandle = initializeSession(); + + List<Row> data = new ArrayList<>(); + data.add(Row.of(1L, 1L, 1L, "2024-01-01")); + data.add(Row.of(2L, 2L, 2L, "2024-01-02")); + data.add(Row.of(3L, 3L, 3L, "2024-01-02")); + String dataId = TestValuesTableFactory.registerData(data); + + String sourceDdl = + String.format( + "CREATE TABLE my_source (\n" + + " order_id BIGINT,\n" + + " user_id BIGINT,\n" + + " shop_id BIGINT,\n" + + " order_created_at STRING\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true',\n" + + " 'data-id' = '%s'\n" + + ")", + dataId); + service.executeStatement(sessionHandle, sourceDdl, -1, new Configuration()); + + String materializedTableDDL = + "CREATE MATERIALIZED TABLE my_materialized_table" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '2' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // verify data exists in materialized table + CommonTestUtils.waitUtil( + () -> + fetchTableData(sessionHandle, "SELECT * FROM my_materialized_table").size() + == data.size(), + Duration.ofMillis(timeout), + Duration.ofMillis(pause), + "Failed to verify the data in materialized table."); + assertThat( + fetchTableData( + sessionHandle, + "SELECT * FROM my_materialized_table where ds = '2024-01-02'") + .size()) + .isEqualTo(2); + + // remove the last element + data.remove(2); + + long currentTime = System.currentTimeMillis(); + String alterStatement = + "ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds = '2024-01-02')"; + OperationHandle alterHandle = + service.executeStatement(sessionHandle, alterStatement, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterHandle); + List<RowData> result = fetchAllResults(service, sessionHandle, alterHandle); + assertThat(result.size()).isEqualTo(1); + String jobId = result.get(0).getString(0).toString(); + + MiniCluster miniCluster = MINI_CLUSTER.getMiniCluster(); Review Comment: Why not use `@InjectClusterClient RestClusterClient<?> restClusterClient` directly, I think using MiniCluster is two heavy here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org