lsyldliu commented on code in PR #24888:
URL: https://github.com/apache/flink/pull/24888#discussion_r1627675280
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -782,35 +783,12 @@ private ResultFetcher callDropMaterializedTableOperation(
if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
== materializedTable.getRefreshStatus()) {
- ContinuousRefreshHandler refreshHandler =
-
deserializeContinuousHandler(materializedTable.getSerializedRefreshHandler());
- // get job running status
- JobStatus jobStatus = getJobStatus(operationExecutor, handle,
refreshHandler);
- if (!jobStatus.isTerminalState()) {
- try {
- cancelJob(operationExecutor, handle,
refreshHandler.getJobId());
- } catch (Exception e) {
- jobStatus = getJobStatus(operationExecutor, handle,
refreshHandler);
- if (!jobStatus.isTerminalState()) {
- throw new SqlExecutionException(
- String.format(
- "Failed to drop the materialized table
%s because the continuous refresh job %s could not be canceled."
- + " The current status of the
continuous refresh job is %s.",
- tableIdentifier,
refreshHandler.getJobId(), jobStatus),
- e);
- } else {
- LOG.warn(
- "An exception occurred while canceling the
continuous refresh job {} for materialized table {},"
- + " but since the job is in a terminal
state, skip the cancel operation.",
- refreshHandler.getJobId(),
- tableIdentifier);
- }
- }
+ if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
+ == materializedTable.getRefreshMode()) {
+ cancelContinuousRefreshJob(
+ operationExecutor, handle, tableIdentifier,
materializedTable);
} else {
- LOG.info(
- "No need to cancel the continuous refresh job {} for
materialized table {} as it is not currently running.",
- refreshHandler.getJobId(),
- tableIdentifier);
+ deleteRefreshWorkflow(tableIdentifier, materializedTable);
}
} else if (CatalogMaterializedTable.RefreshStatus.INITIALIZING
Review Comment:
If the refresh status is `SUSPENDED`, we also need to drop the refresh
workflow before drop the materialized table.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/workflow/scheduler/EmbeddedQuartzScheduler.java:
##########
@@ -271,11 +271,6 @@ public void deleteScheduleWorkflow(String workflowName,
String workflowGroup)
JobKey jobKey = JobKey.jobKey(workflowName, workflowGroup);
lock.writeLock().lock();
try {
- String errorMsg =
- String.format(
- "Failed to delete a non-existent quartz schedule
job: %s.", jobKey);
- checkJobExists(jobKey, errorMsg);
-
quartzScheduler.deleteJob(jobKey);
Review Comment:
Do you verify whether `quartzScheduler.deleteJob` throws an exception when
deleting a not exist job?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]