This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8a7e54993980d84886c5c24d2fe9d2f7f42b9f74 Author: Rui Fan <[email protected]> AuthorDate: Sun Dec 17 11:56:46 2023 +0800 [FLINK-33565][Exception] Archive exceptions into the exception history immediately when they occur, instead of archiving them when restarting --- .../org/apache/flink/runtime/scheduler/DefaultScheduler.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index 393d6329473..e3b046301b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -359,17 +359,13 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart); - final FailureHandlingResultSnapshot failureHandlingResultSnapshot = - createFailureHandlingResultSnapshot(failureHandlingResult); + archiveFromFailureHandlingResult( + createFailureHandlingResultSnapshot(failureHandlingResult)); delayExecutor.schedule( () -> FutureUtils.assertNoException( cancelFuture.thenRunAsync( - () -> { - archiveFromFailureHandlingResult( - failureHandlingResultSnapshot); - restartTasks(executionVertexVersions, globalRecovery); - }, + () -> restartTasks(executionVertexVersions, globalRecovery), getMainThreadExecutor())), failureHandlingResult.getRestartDelayMS(), TimeUnit.MILLISECONDS);
