This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 63817b5  [FLINK-26680][coordination] Properly handle deleted jobs 
during recovery
63817b5 is described below

commit 63817b5ffdf7ba24a168aeec95464d13e4d78e13
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Mar 17 11:22:26 2022 +0100

    [FLINK-26680][coordination] Properly handle deleted jobs during recovery
---
 .../runner/SessionDispatcherLeaderProcess.java     | 14 +++--
 .../runner/SessionDispatcherLeaderProcessTest.java | 66 ++++++++++++++++++++++
 2 files changed, 76 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
index 6f9d0bf..6318700 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
@@ -147,7 +147,7 @@ public class SessionDispatcherLeaderProcess extends 
AbstractDispatcherLeaderProc
 
         for (JobID jobId : jobIds) {
             if (!recoveredDirtyJobResults.contains(jobId)) {
-                recoveredJobGraphs.add(recoverJob(jobId));
+                tryRecoverJob(jobId).ifPresent(recoveredJobGraphs::add);
             }
         }
 
@@ -164,10 +164,16 @@ public class SessionDispatcherLeaderProcess extends 
AbstractDispatcherLeaderProc
         }
     }
 
-    private JobGraph recoverJob(JobID jobId) {
+    private Optional<JobGraph> tryRecoverJob(JobID jobId) {
         log.info("Trying to recover job with job id {}.", jobId);
         try {
-            return jobGraphStore.recoverJobGraph(jobId);
+            final JobGraph jobGraph = jobGraphStore.recoverJobGraph(jobId);
+            if (jobGraph == null) {
+                log.info(
+                        "Skipping recovery of job with job id {}, because it 
already finished in a previous execution",
+                        jobId);
+            }
+            return Optional.ofNullable(jobGraph);
         } catch (Exception e) {
             throw new FlinkRuntimeException(
                     String.format("Could not recover job with job id %s.", 
jobId), e);
@@ -264,7 +270,7 @@ public class SessionDispatcherLeaderProcess extends 
AbstractDispatcherLeaderProc
     }
 
     private Optional<JobGraph> recoverJobIfRunning(JobID jobId) {
-        return supplyUnsynchronizedIfRunning(() -> recoverJob(jobId));
+        return supplyUnsynchronizedIfRunning(() -> 
tryRecoverJob(jobId)).flatMap(x -> x);
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
index f48ca06..a90a245 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
@@ -46,6 +46,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -238,6 +239,71 @@ public class SessionDispatcherLeaderProcessTest {
     }
 
     @Test
+    public void testRecoveryWhileJobGraphRecoveryIsScheduledConcurrently() 
throws Exception {
+        final JobResult dirtyJobResult =
+                TestingJobResultStore.createSuccessfulJobResult(new JobID());
+
+        OneShotLatch recoveryInitiatedLatch = new OneShotLatch();
+        OneShotLatch jobGraphAddedLatch = new OneShotLatch();
+
+        jobGraphStore =
+                TestingJobGraphStore.newBuilder()
+                        // mimic behavior when recovering a JobGraph that is 
marked for deletion
+                        .setRecoverJobGraphFunction((jobId, jobs) -> null)
+                        .build();
+
+        jobResultStore =
+                TestingJobResultStore.builder()
+                        .withGetDirtyResultsSupplier(
+                                () -> {
+                                    recoveryInitiatedLatch.trigger();
+                                    try {
+                                        jobGraphAddedLatch.await();
+                                    } catch (InterruptedException e) {
+                                        Thread.currentThread().interrupt();
+                                    }
+                                    return 
Collections.singleton(dirtyJobResult);
+                                })
+                        .build();
+
+        final CompletableFuture<Collection<JobGraph>> recoveredJobGraphsFuture 
=
+                new CompletableFuture<>();
+        final CompletableFuture<Collection<JobResult>> 
recoveredDirtyJobResultsFuture =
+                new CompletableFuture<>();
+        dispatcherServiceFactory =
+                (ignoredDispatcherId,
+                        recoveredJobs,
+                        recoveredDirtyJobResults,
+                        ignoredJobGraphWriter,
+                        ignoredJobResultStore) -> {
+                    recoveredJobGraphsFuture.complete(recoveredJobs);
+                    
recoveredDirtyJobResultsFuture.complete(recoveredDirtyJobResults);
+                    return 
TestingDispatcherGatewayService.newBuilder().build();
+                };
+
+        try (final SessionDispatcherLeaderProcess dispatcherLeaderProcess =
+                createDispatcherLeaderProcess()) {
+            dispatcherLeaderProcess.start();
+
+            // start returns without the initial recovery being completed
+            // mimic ZK message about an added jobgraph while the recovery is 
ongoing
+            recoveryInitiatedLatch.await();
+            dispatcherLeaderProcess.onAddedJobGraph(dirtyJobResult.getJobId());
+            jobGraphAddedLatch.trigger();
+
+            assertThat(recoveredJobGraphsFuture)
+                    .succeedsWithin(Duration.ofHours(1))
+                    .satisfies(recovedJobGraphs -> 
assertThat(recovedJobGraphs).isEmpty());
+            assertThat(recoveredDirtyJobResultsFuture)
+                    .succeedsWithin(Duration.ofHours(1))
+                    .satisfies(
+                            recoveredDirtyJobResults ->
+                                    assertThat(recoveredDirtyJobResults)
+                                            .containsExactly(dirtyJobResult));
+        }
+    }
+
+    @Test
     public void closeAsync_stopsJobGraphStoreAndDispatcher() throws Exception {
         final CompletableFuture<Void> jobGraphStopFuture = new 
CompletableFuture<>();
         jobGraphStore =

Reply via email to