lsyldliu commented on code in PR #25880:
URL: https://github.com/apache/flink/pull/25880#discussion_r1900846618
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +808,78 @@ protected static String getRefreshStatement(
return insertStatement.toString();
}
+ private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ AlterMaterializedTableAsQueryOperation op) {
+ ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+ CatalogMaterializedTable materializedTable =
+ getCatalogMaterializedTable(operationExecutor,
tableIdentifier);
+
+ // 1. suspend the materialized table
+ if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+ != materializedTable.getRefreshStatus()) {
+ if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
+ == materializedTable.getRefreshMode()) {
+ suspendContinuousRefreshJob(
+ operationExecutor, handle, tableIdentifier,
materializedTable);
+ } else {
+ suspendRefreshWorkflow(
+ operationExecutor, handle, tableIdentifier,
materializedTable);
+ }
+ }
+
+ // 2. replace query definition and resume the materialized table
+ // alter materialized table schema
+ operationExecutor.callExecutableOperation(handle, op);
+ ResolvedCatalogMaterializedTable updatedMaterializedTable =
+ getCatalogMaterializedTable(operationExecutor,
tableIdentifier);
+
+ // 3. resume the materialized table
+ if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+ != materializedTable.getRefreshStatus()) {
+ if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
+ == materializedTable.getRefreshMode()) {
+ executeContinuousRefreshJob(
+ operationExecutor,
+ handle,
+ updatedMaterializedTable,
+ tableIdentifier,
+ Collections.emptyMap(),
+ Optional.empty());
+ } else {
+ // resume workflow
+ resumeRefreshWorkflow(
+ operationExecutor,
+ handle,
+ tableIdentifier,
+ updatedMaterializedTable,
+ Collections.emptyMap());
+ }
+ } else {
+ if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
+ == materializedTable.getRefreshMode()) {
+ // we should reset the savepoint path after the alter operation
+ ContinuousRefreshHandler refreshHandler =
+ deserializeContinuousHandler(
+
materializedTable.getSerializedRefreshHandler());
+ ContinuousRefreshHandler resetHandler =
+ new ContinuousRefreshHandler(
+ refreshHandler.getExecutionTarget(),
refreshHandler.getJobId());
+ updateRefreshHandler(
+ operationExecutor,
+ handle,
+ tableIdentifier,
+ updatedMaterializedTable,
+ updatedMaterializedTable.getRefreshStatus(),
+ resetHandler.asSummaryString(),
+ serializeContinuousHandler(resetHandler));
+ }
+ }
+
Review Comment:
We should execute the alter materialized table operation here after all pre
works are success.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]