This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 63817b5 [FLINK-26680][coordination] Properly handle deleted jobs
during recovery
63817b5 is described below
commit 63817b5ffdf7ba24a168aeec95464d13e4d78e13
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Mar 17 11:22:26 2022 +0100
[FLINK-26680][coordination] Properly handle deleted jobs during recovery
---
.../runner/SessionDispatcherLeaderProcess.java | 14 +++--
.../runner/SessionDispatcherLeaderProcessTest.java | 66 ++++++++++++++++++++++
2 files changed, 76 insertions(+), 4 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
index 6f9d0bf..6318700 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcess.java
@@ -147,7 +147,7 @@ public class SessionDispatcherLeaderProcess extends
AbstractDispatcherLeaderProc
for (JobID jobId : jobIds) {
if (!recoveredDirtyJobResults.contains(jobId)) {
- recoveredJobGraphs.add(recoverJob(jobId));
+ tryRecoverJob(jobId).ifPresent(recoveredJobGraphs::add);
}
}
@@ -164,10 +164,16 @@ public class SessionDispatcherLeaderProcess extends
AbstractDispatcherLeaderProc
}
}
- private JobGraph recoverJob(JobID jobId) {
+ private Optional<JobGraph> tryRecoverJob(JobID jobId) {
log.info("Trying to recover job with job id {}.", jobId);
try {
- return jobGraphStore.recoverJobGraph(jobId);
+ final JobGraph jobGraph = jobGraphStore.recoverJobGraph(jobId);
+ if (jobGraph == null) {
+ log.info(
+ "Skipping recovery of job with job id {}, because it
already finished in a previous execution",
+ jobId);
+ }
+ return Optional.ofNullable(jobGraph);
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Could not recover job with job id %s.",
jobId), e);
@@ -264,7 +270,7 @@ public class SessionDispatcherLeaderProcess extends
AbstractDispatcherLeaderProc
}
private Optional<JobGraph> recoverJobIfRunning(JobID jobId) {
- return supplyUnsynchronizedIfRunning(() -> recoverJob(jobId));
+ return supplyUnsynchronizedIfRunning(() ->
tryRecoverJob(jobId)).flatMap(x -> x);
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
index f48ca06..a90a245 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/SessionDispatcherLeaderProcessTest.java
@@ -46,6 +46,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -238,6 +239,71 @@ public class SessionDispatcherLeaderProcessTest {
}
@Test
+ public void testRecoveryWhileJobGraphRecoveryIsScheduledConcurrently()
throws Exception {
+ final JobResult dirtyJobResult =
+ TestingJobResultStore.createSuccessfulJobResult(new JobID());
+
+ OneShotLatch recoveryInitiatedLatch = new OneShotLatch();
+ OneShotLatch jobGraphAddedLatch = new OneShotLatch();
+
+ jobGraphStore =
+ TestingJobGraphStore.newBuilder()
+ // mimic behavior when recovering a JobGraph that is
marked for deletion
+ .setRecoverJobGraphFunction((jobId, jobs) -> null)
+ .build();
+
+ jobResultStore =
+ TestingJobResultStore.builder()
+ .withGetDirtyResultsSupplier(
+ () -> {
+ recoveryInitiatedLatch.trigger();
+ try {
+ jobGraphAddedLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return
Collections.singleton(dirtyJobResult);
+ })
+ .build();
+
+ final CompletableFuture<Collection<JobGraph>> recoveredJobGraphsFuture
=
+ new CompletableFuture<>();
+ final CompletableFuture<Collection<JobResult>>
recoveredDirtyJobResultsFuture =
+ new CompletableFuture<>();
+ dispatcherServiceFactory =
+ (ignoredDispatcherId,
+ recoveredJobs,
+ recoveredDirtyJobResults,
+ ignoredJobGraphWriter,
+ ignoredJobResultStore) -> {
+ recoveredJobGraphsFuture.complete(recoveredJobs);
+
recoveredDirtyJobResultsFuture.complete(recoveredDirtyJobResults);
+ return
TestingDispatcherGatewayService.newBuilder().build();
+ };
+
+ try (final SessionDispatcherLeaderProcess dispatcherLeaderProcess =
+ createDispatcherLeaderProcess()) {
+ dispatcherLeaderProcess.start();
+
+ // start returns without the initial recovery being completed
+ // mimic ZK message about an added jobgraph while the recovery is
ongoing
+ recoveryInitiatedLatch.await();
+ dispatcherLeaderProcess.onAddedJobGraph(dirtyJobResult.getJobId());
+ jobGraphAddedLatch.trigger();
+
+ assertThat(recoveredJobGraphsFuture)
+ .succeedsWithin(Duration.ofHours(1))
+ .satisfies(recovedJobGraphs ->
assertThat(recovedJobGraphs).isEmpty());
+ assertThat(recoveredDirtyJobResultsFuture)
+ .succeedsWithin(Duration.ofHours(1))
+ .satisfies(
+ recoveredDirtyJobResults ->
+ assertThat(recoveredDirtyJobResults)
+ .containsExactly(dirtyJobResult));
+ }
+ }
+
+ @Test
public void closeAsync_stopsJobGraphStoreAndDispatcher() throws Exception {
final CompletableFuture<Void> jobGraphStopFuture = new
CompletableFuture<>();
jobGraphStore =