hackergin commented on code in PR #25880:
URL: https://github.com/apache/flink/pull/25880#discussion_r1907276654
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +809,160 @@ protected static String getRefreshStatement(
return insertStatement.toString();
}
+ private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+ OperationExecutor operationExecutor,
+ OperationHandle handle,
+ AlterMaterializedTableAsQueryOperation op) {
+ ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+ CatalogMaterializedTable oldMaterializedTable =
+ getCatalogMaterializedTable(operationExecutor,
tableIdentifier);
+
+ if (CatalogMaterializedTable.RefreshMode.FULL ==
oldMaterializedTable.getRefreshMode()) {
+ // directly apply the alter operation
+ AlterMaterializedTableChangeOperation
alterMaterializedTableChangeOperation =
+ new AlterMaterializedTableChangeOperation(
+ tableIdentifier, op.getTableChanges(),
op.getNewMaterializedTable());
+ return operationExecutor.callExecutableOperation(
+ handle, alterMaterializedTableChangeOperation);
+ }
+
+ if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
+ == oldMaterializedTable.getRefreshStatus()) {
+ // 1. suspend the materialized table
+ CatalogMaterializedTable suspendMaterializedTable =
+ suspendContinuousRefreshJob(
+ operationExecutor, handle, tableIdentifier,
oldMaterializedTable);
+
+ // 2. alter materialized table schema & query definition
+ CatalogMaterializedTable updatedMaterializedTable =
+ op.getNewMaterializedTable()
+ .copy(
+
suspendMaterializedTable.getRefreshStatus(),
+ suspendMaterializedTable
+ .getRefreshHandlerDescription()
+ .orElse(null),
+
suspendMaterializedTable.getSerializedRefreshHandler());
+ AlterMaterializedTableChangeOperation
alterMaterializedTableChangeOperation =
+ new AlterMaterializedTableChangeOperation(
+ tableIdentifier, op.getTableChanges(),
updatedMaterializedTable);
+ operationExecutor.callExecutableOperation(
+ handle, alterMaterializedTableChangeOperation);
+
+ // 3. resume the materialized table
+ try {
+ executeContinuousRefreshJob(
+ operationExecutor,
+ handle,
+ updatedMaterializedTable,
+ tableIdentifier,
+ Collections.emptyMap(),
+ Optional.empty());
+ } catch (Exception e) {
+ // Roll back the changes to the materialized table and restore
the continuous
+ // refresh job
+ LOG.warn(
+ "Failed to resume the continuous refresh job for
materialized table {}, rollback the alter materialized table as query
operation.",
+ tableIdentifier,
+ e);
+
+ AlterMaterializedTableChangeOperation rollbackChangeOperation =
+ generateRollbackAlterMaterializedTableOperation(
+ suspendMaterializedTable,
alterMaterializedTableChangeOperation);
+ operationExecutor.callExecutableOperation(handle,
rollbackChangeOperation);
+
+ ContinuousRefreshHandler continuousRefreshHandler =
+ deserializeContinuousHandler(
+
suspendMaterializedTable.getSerializedRefreshHandler());
+ executeContinuousRefreshJob(
+ operationExecutor,
+ handle,
+ suspendMaterializedTable,
+ tableIdentifier,
+ Collections.emptyMap(),
+ continuousRefreshHandler.getRestorePath());
+
+ throw new SqlExecutionException(
+ String.format(
+ "Failed to alter materialized table as query
operation for materialized table %s.",
Review Comment:
Here, I understand that it’s not about resuming the failure of the original
task, but rather failing to start with a new query, keeping it consistent with
the above log.warn()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]