This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4420805067303841524f7b9987b33c8266b8cb87 Author: Feng Jin <jinfeng1...@gmail.com> AuthorDate: Sun Jan 5 20:06:59 2025 +0800 [FLINK-36994][table] Support executing ALTER MATERIALIZED TABLE AS operation --- .../MaterializedTableManager.java | 173 +++++++++- .../service/MaterializedTableStatementITCase.java | 349 +++++++++++++++++++++ 2 files changed, 518 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index 2398e72172a..2c2ea78f100 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -34,6 +34,7 @@ import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.TableChange.MaterializedTableChange; import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; @@ -48,6 +49,7 @@ import org.apache.flink.table.gateway.service.result.ResultFetcher; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.operations.command.DescribeJobOperation; import org.apache.flink.table.operations.command.StopJobOperation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation; @@ -174,6 +176,9 @@ public class MaterializedTableManager { } else if (op instanceof DropMaterializedTableOperation) { return callDropMaterializedTableOperation( operationExecutor, handle, (DropMaterializedTableOperation) op); + } else if (op instanceof AlterMaterializedTableAsQueryOperation) { + return callAlterMaterializedTableAsQueryOperation( + operationExecutor, handle, (AlterMaterializedTableAsQueryOperation) op); } throw new SqlExecutionException( @@ -315,7 +320,7 @@ public class MaterializedTableManager { return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); } - private void suspendContinuousRefreshJob( + private CatalogMaterializedTable suspendContinuousRefreshJob( OperationExecutor operationExecutor, OperationHandle handle, ObjectIdentifier tableIdentifier, @@ -341,7 +346,7 @@ public class MaterializedTableManager { refreshHandler.getJobId(), savepointPath); - updateRefreshHandler( + return updateRefreshHandler( operationExecutor, handle, tableIdentifier, @@ -804,6 +809,164 @@ public class MaterializedTableManager { return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable oldMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) { + // directly apply the alter operation + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, op.getTableChanges(), op.getNewMaterializedTable()); + return operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + } + + if (CatalogMaterializedTable.RefreshStatus.ACTIVATED + == oldMaterializedTable.getRefreshStatus()) { + // 1. suspend the materialized table + CatalogMaterializedTable suspendMaterializedTable = + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, oldMaterializedTable); + + // 2. alter materialized table schema & query definition + CatalogMaterializedTable updatedMaterializedTable = + op.getNewMaterializedTable() + .copy( + suspendMaterializedTable.getRefreshStatus(), + suspendMaterializedTable + .getRefreshHandlerDescription() + .orElse(null), + suspendMaterializedTable.getSerializedRefreshHandler()); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, op.getTableChanges(), updatedMaterializedTable); + operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + + // 3. resume the materialized table + try { + executeContinuousRefreshJob( + operationExecutor, + handle, + updatedMaterializedTable, + tableIdentifier, + Collections.emptyMap(), + Optional.empty()); + } catch (Exception e) { + // Roll back the changes to the materialized table and restore the continuous + // refresh job + LOG.warn( + "Failed to start the continuous refresh job for materialized table {} using new query {}, rollback to origin query {}.", + tableIdentifier, + op.getNewMaterializedTable().getDefinitionQuery(), + suspendMaterializedTable.getDefinitionQuery(), + e); + + AlterMaterializedTableChangeOperation rollbackChangeOperation = + generateRollbackAlterMaterializedTableOperation( + suspendMaterializedTable, alterMaterializedTableChangeOperation); + operationExecutor.callExecutableOperation(handle, rollbackChangeOperation); + + ContinuousRefreshHandler continuousRefreshHandler = + deserializeContinuousHandler( + suspendMaterializedTable.getSerializedRefreshHandler()); + executeContinuousRefreshJob( + operationExecutor, + handle, + suspendMaterializedTable, + tableIdentifier, + Collections.emptyMap(), + continuousRefreshHandler.getRestorePath()); + + throw new SqlExecutionException( + String.format( + "Failed to start the continuous refresh job using new query %s when altering materialized table %s select query.", + op.getNewMaterializedTable().getDefinitionQuery(), tableIdentifier), + e); + } + } else if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + == oldMaterializedTable.getRefreshStatus()) { + // alter schema & definition query & refresh handler (reset savepoint path of refresh + // handler) + List<MaterializedTableChange> tableChanges = new ArrayList<>(op.getTableChanges()); + TableChange.ModifyRefreshHandler modifyRefreshHandler = + genereateResetSavepointTableChange( + oldMaterializedTable.getSerializedRefreshHandler()); + tableChanges.add(modifyRefreshHandler); + + CatalogMaterializedTable updatedMaterializedTable = + op.getNewMaterializedTable() + .copy( + oldMaterializedTable.getRefreshStatus(), + modifyRefreshHandler.getRefreshHandlerDesc(), + modifyRefreshHandler.getRefreshHandlerBytes()); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, tableChanges, updatedMaterializedTable); + + operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + } else { + throw new SqlExecutionException( + String.format( + "Materialized table %s is being initialized and does not support alter operation.", + tableIdentifier)); + } + + return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); + } + + private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedTableOperation( + CatalogMaterializedTable oldMaterializedTable, + AlterMaterializedTableChangeOperation op) { + List<MaterializedTableChange> tableChanges = op.getTableChanges(); + List<MaterializedTableChange> rollbackChanges = new ArrayList<>(); + + for (TableChange tableChange : tableChanges) { + if (tableChange instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) tableChange; + rollbackChanges.add(TableChange.dropColumn(addColumn.getColumn().getName())); + } else if (tableChange instanceof TableChange.ModifyRefreshHandler) { + rollbackChanges.add( + TableChange.modifyRefreshHandler( + oldMaterializedTable.getRefreshHandlerDescription().orElse(null), + oldMaterializedTable.getSerializedRefreshHandler())); + } else if (tableChange instanceof TableChange.ModifyDefinitionQuery) { + rollbackChanges.add( + TableChange.modifyDefinitionQuery( + oldMaterializedTable.getDefinitionQuery())); + } else { + throw new ValidationException( + String.format( + "Failed to generate rollback changes for materialized table '%s'. " + + "Unsupported table change detected: %s. ", + op.getTableIdentifier(), tableChange)); + } + } + + return new AlterMaterializedTableChangeOperation( + op.getTableIdentifier(), rollbackChanges, oldMaterializedTable); + } + + private TableChange.ModifyRefreshHandler genereateResetSavepointTableChange( + byte[] serializedContinuousHandler) { + ContinuousRefreshHandler continuousRefreshHandler = + deserializeContinuousHandler(serializedContinuousHandler); + ContinuousRefreshHandler resetedRefreshHandler = + new ContinuousRefreshHandler( + continuousRefreshHandler.getExecutionTarget(), + continuousRefreshHandler.getJobId()); + + return TableChange.modifyRefreshHandler( + resetedRefreshHandler.asSummaryString(), + serializeContinuousHandler(resetedRefreshHandler)); + } + private ResultFetcher callDropMaterializedTableOperation( OperationExecutor operationExecutor, OperationHandle handle, @@ -1018,7 +1181,7 @@ public class MaterializedTableManager { return (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable; } - private void updateRefreshHandler( + private CatalogMaterializedTable updateRefreshHandler( OperationExecutor operationExecutor, OperationHandle operationHandle, ObjectIdentifier materializedTableIdentifier, @@ -1029,7 +1192,7 @@ public class MaterializedTableManager { CatalogMaterializedTable updatedMaterializedTable = catalogMaterializedTable.copy( refreshStatus, refreshHandlerSummary, serializedRefreshHandler); - List<TableChange> tableChanges = new ArrayList<>(); + List<MaterializedTableChange> tableChanges = new ArrayList<>(); tableChanges.add(TableChange.modifyRefreshStatus(refreshStatus)); tableChanges.add( TableChange.modifyRefreshHandler(refreshHandlerSummary, serializedRefreshHandler)); @@ -1039,6 +1202,8 @@ public class MaterializedTableManager { // update RefreshHandler to Catalog operationExecutor.callExecutableOperation( operationHandle, alterMaterializedTableChangeOperation); + + return updatedMaterializedTable; } /** Generate insert statement for materialized table. */ diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index b5f5fc3bd5f..49754e42aad 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -72,6 +72,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTING_INTERVAL; import static org.apache.flink.table.api.config.TableConfigOptions.RESOURCES_DOWNLOAD_DIR; @@ -1027,6 +1028,348 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops"))); + + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); + } + + @Test + void testAlterMaterializedTableAsQueryInFullMode() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL); + + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // Alter materialized table as query in full mode + String alterMaterializedTableAsQueryDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt,\n" + + " SUM(order_amount) AS order_amount_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id, 1 as order_amount FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle alterMaterializedTableAsQueryHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableAsQueryDDL, -1, new Configuration()); + + awaitOperationTermination(service, sessionHandle, alterMaterializedTableAsQueryHandle); + + // verify the altered materialized table + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo( + Collections.singletonList( + Column.physical("order_amount_sum", DataTypes.INT()))); + assertThat(newTable.getDefinitionQuery()) + .isEqualTo( + String.format( + "SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) AS `order_amount_sum`\n" + + "FROM (SELECT `my_source`.`user_id`, `my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`, `my_source`.`order_id`, 1 AS `order_amount`\n" + + "FROM `%s`.`test_db`.`my_source`) AS `tmp`\n" + + "GROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)", + fileSystemCatalogName)); + // the refresh handler in full mode should be the same as the old one + assertThat(oldTable.getSerializedRefreshHandler()) + .isEqualTo(newTable.getSerializedRefreshHandler()); + assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); + } + + @Test + void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL); + + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // Alter materialized table suspend + String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND"; + OperationHandle alterMaterializedTableSuspendHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableSuspendDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterMaterializedTableSuspendHandle); + + // Alter materialized table as query in full mode + String alterMaterializedTableAsQueryDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt,\n" + + " SUM(order_amount) AS order_amount_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id, 1 as order_amount FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle alterMaterializedTableAsQueryHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableAsQueryDDL, -1, new Configuration()); + + awaitOperationTermination(service, sessionHandle, alterMaterializedTableAsQueryHandle); + + // verify the altered materialized table + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo( + Collections.singletonList( + Column.physical("order_amount_sum", DataTypes.INT()))); + assertThat(newTable.getDefinitionQuery()) + .isEqualTo( + String.format( + "SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, COUNT(`tmp`.`order_id`) AS `order_cnt`, SUM(`tmp`.`order_amount`) AS `order_amount_sum`\n" + + "FROM (SELECT `my_source`.`user_id`, `my_source`.`shop_id`, `my_source`.`order_created_at` AS `ds`, `my_source`.`order_id`, 1 AS `order_amount`\n" + + "FROM `%s`.`test_db`.`my_source`) AS `tmp`\n" + + "GROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)", + fileSystemCatalogName)); + + // the refresh handler in full mode should be the same as the old one + assertThat(oldTable.getSerializedRefreshHandler()) + .isEqualTo(newTable.getSerializedRefreshHandler()); + assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); + } + + @Test + void testAlterMaterializedTableAsQueryInContinuousMode(@TempDir Path temporaryPath) + throws Exception { + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops (" + + " PRIMARY KEY (ds, user_id) not enforced)" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " coalesce(user_id, 0) as user_id,\n" + + " shop_id,\n" + + " coalesce(ds, '') as ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // verify background job is running + ContinuousRefreshHandler activeRefreshHandler = + ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( + oldTable.getSerializedRefreshHandler(), getClass().getClassLoader()); + waitUntilAllTasksAreRunning( + restClusterClient, JobID.fromHexString(activeRefreshHandler.getJobId())); + + // setup savepoint dir + String savepointDir = "file://" + temporaryPath.toAbsolutePath(); + String setupSavepointDDL = + "SET 'execution.checkpointing.savepoint-dir' = '" + savepointDir + "'"; + OperationHandle setupSavepointHandle = + service.executeStatement(sessionHandle, setupSavepointDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, setupSavepointHandle); + + String alterTableDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " coalesce(user_id, 0) as user_id,\n" + + " shop_id,\n" + + " coalesce(ds, '') as ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle alterTableHandle = + service.executeStatement(sessionHandle, alterTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterTableHandle); + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo(Collections.singletonList(Column.physical("pv", DataTypes.INT()))); + assertThat(newTable.getResolvedSchema().getPrimaryKey()) + .isEqualTo(oldTable.getResolvedSchema().getPrimaryKey()); + assertThat(newTable.getResolvedSchema().getWatermarkSpecs()) + .isEqualTo(oldTable.getResolvedSchema().getWatermarkSpecs()); + assertThat(newTable.getDefinitionQuery()) + .isEqualTo( + String.format( + "SELECT COALESCE(`tmp`.`user_id`, 0) AS `user_id`, `tmp`.`shop_id`, COALESCE(`tmp`.`ds`, '') AS `ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n" + + "FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, `DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, `datagenSource`.`payment_amount_cents`\n" + + "FROM `%s`.`test_db`.`datagenSource`) AS `tmp`\n" + + "GROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)", + fileSystemCatalogName)); + assertThat(oldTable.getSerializedRefreshHandler()) + .isNotEqualTo(newTable.getSerializedRefreshHandler()); + + // verify the new continuous job is start without savepoint + ContinuousRefreshHandler newContinuousRefreshHandler = + ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( + newTable.getSerializedRefreshHandler(), + Thread.currentThread().getContextClassLoader()); + Optional<String> restorePath = + getJobRestoreSavepointPath( + restClusterClient, newContinuousRefreshHandler.getJobId()); + assertThat(restorePath).isEmpty(); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); + } + + @Test + void testAlterMaterializedTableAsQueryInContinuousModeWithSuspendStatus( + @TempDir Path temporaryPath) throws Exception { + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // verify background job is running + ContinuousRefreshHandler activeRefreshHandler = + ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( + oldTable.getSerializedRefreshHandler(), getClass().getClassLoader()); + waitUntilAllTasksAreRunning( + restClusterClient, JobID.fromHexString(activeRefreshHandler.getJobId())); + + // setup savepoint dir + String savepointDir = "file://" + temporaryPath.toAbsolutePath(); + String setupSavepointDDL = + "SET 'execution.checkpointing.savepoint-dir' = '" + savepointDir + "'"; + OperationHandle setupSavepointHandle = + service.executeStatement(sessionHandle, setupSavepointDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, setupSavepointHandle); + + // suspend materialized table + String suspendTableDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND"; + OperationHandle suspendTableHandle = + service.executeStatement(sessionHandle, suspendTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, suspendTableHandle); + + String alterTableDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle alterTableHandle = + service.executeStatement(sessionHandle, alterTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterTableHandle); + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo(Collections.singletonList(Column.physical("pv", DataTypes.INT()))); + assertThat(newTable.getDefinitionQuery()) + .isEqualTo( + String.format( + "SELECT `tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`, SUM(`tmp`.`payment_amount_cents`) AS `payed_buy_fee_sum`, SUM(1) AS `pv`\n" + + "FROM (SELECT `datagenSource`.`user_id`, `datagenSource`.`shop_id`, `DATE_FORMAT`(`datagenSource`.`order_created_at`, 'yyyy-MM-dd') AS `ds`, `datagenSource`.`payment_amount_cents`\n" + + "FROM `%s`.`test_db`.`datagenSource`) AS `tmp`\n" + + "GROUP BY ROW(`tmp`.`user_id`, `tmp`.`shop_id`, `tmp`.`ds`)", + fileSystemCatalogName)); + assertThat(oldTable.getSerializedRefreshHandler()) + .isEqualTo(newTable.getSerializedRefreshHandler()); + assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); + + // verify the restore path is empty + ContinuousRefreshHandler newContinuousRefreshHandler = + ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( + newTable.getSerializedRefreshHandler(), + Thread.currentThread().getContextClassLoader()); + assertThat(newContinuousRefreshHandler.getRestorePath()).isEmpty(); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); } @Test @@ -1574,6 +1917,12 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS fileSystemCatalogName, TEST_DEFAULT_DATABASE, "my_materialized_table")); } + private List<Column> getAddedColumns(ResolvedSchema newSchema, ResolvedSchema oldSchema) { + return newSchema.getColumns().stream() + .filter(column -> !oldSchema.getColumns().contains(column)) + .collect(Collectors.toList()); + } + private int getPartitionSize(List<Row> data, String partition) { return (int) data.stream().filter(row -> row.getField(3).toString().equals(partition)).count();