This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new a2925a0 [FLINK-19816] Make job state cleanup dependent on final job
result
a2925a0 is described below
commit a2925a0d2e894bf28aaced2993ec453589d143de
Author: Till Rohrmann <[email protected]>
AuthorDate: Thu Nov 12 18:45:35 2020 +0100
[FLINK-19816] Make job state cleanup dependent on final job result
In order to avoid race conditions between stopping a Dispatcher and a
finishing
job, we now wait on the actual job result to decide whether to clean up the
job's
HA data or not. A stopping dispatcher will simply close the DispatcherJobs
and
continue the clean up operation on the job has been terminated.
This closes #14055.
---
.../flink/runtime/dispatcher/Dispatcher.java | 113 +++++++++++----------
.../flink/runtime/dispatcher/MiniDispatcher.java | 12 ++-
.../runtime/jobmaster/JobManagerRunnerImpl.java | 7 +-
.../dispatcher/DispatcherResourceCleanupTest.java | 52 ++++++++++
.../runtime/jobmaster/TestingJobManagerRunner.java | 8 ++
5 files changed, 127 insertions(+), 65 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 55e2c3d..c78ffc8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -397,6 +397,17 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
getMainThreadExecutor());
}
+ enum CleanupJobState {
+ LOCAL(false),
+ GLOBAL(true);
+
+ final boolean cleanupHAData;
+
+ CleanupJobState(boolean cleanupHAData) {
+ this.cleanupHAData = cleanupHAData;
+ }
+ }
+
private CompletableFuture<JobManagerRunner>
createJobManagerRunner(JobGraph jobGraph) {
final RpcService rpcService = getRpcService();
@@ -422,32 +433,33 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
private JobManagerRunner startJobManagerRunner(JobManagerRunner
jobManagerRunner) throws Exception {
final JobID jobId = jobManagerRunner.getJobID();
- FutureUtils.assertNoException(
- jobManagerRunner.getResultFuture().handleAsync(
- (ArchivedExecutionGraph archivedExecutionGraph,
Throwable throwable) -> {
- // check if we are still the active
JobManagerRunner by checking the identity
- final JobManagerRunner
currentJobManagerRunner =
Optional.ofNullable(jobManagerRunnerFutures.get(jobId))
- .map(future ->
future.getNow(null))
- .orElse(null);
- //noinspection ObjectEquality
- if (jobManagerRunner ==
currentJobManagerRunner) {
- if (archivedExecutionGraph !=
null) {
-
jobReachedGloballyTerminalState(archivedExecutionGraph);
- } else {
- final Throwable
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
-
- if (strippedThrowable
instanceof JobNotFinishedException) {
-
jobNotFinished(jobId);
- } else {
-
jobMasterFailed(jobId, strippedThrowable);
- }
- }
+ final CompletableFuture<CleanupJobState> cleanupJobStateFuture
= jobManagerRunner.getResultFuture().handleAsync(
+ (ArchivedExecutionGraph archivedExecutionGraph,
Throwable throwable) -> {
+ // check if we are still the active
JobManagerRunner by checking the identity
+ final JobManagerRunner currentJobManagerRunner
= Optional.ofNullable(jobManagerRunnerFutures.get(jobId))
+ .map(future -> future.getNow(null))
+ .orElse(null);
+
+ Preconditions.checkState(jobManagerRunner ==
currentJobManagerRunner, "The runner entry in jobManagerRunnerFutures must be
bound to the lifetime of the JobManagerRunner.");
+ if (archivedExecutionGraph != null) {
+ return
jobReachedGloballyTerminalState(archivedExecutionGraph);
+ } else {
+ final Throwable strippedThrowable =
ExceptionUtils.stripCompletionException(throwable);
+
+ if (strippedThrowable instanceof
JobNotFinishedException) {
+ return jobNotFinished(jobId);
} else {
- log.debug("There is a newer
JobManagerRunner for the job {}.", jobId);
+ return jobMasterFailed(jobId,
strippedThrowable);
}
+ }
+ }, getMainThreadExecutor());
- return null;
- }, getMainThreadExecutor()));
+ final CompletableFuture<Void> jobTerminationFuture =
cleanupJobStateFuture
+ .thenApply(cleanupJobState -> removeJob(jobId,
cleanupJobState))
+ .thenCompose(Function.identity());
+
+ FutureUtils.assertNoException(jobTerminationFuture);
+ registerJobManagerRunnerTerminationFuture(jobId,
jobTerminationFuture);
jobManagerRunner.start();
@@ -666,22 +678,8 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
jobMasterGateway.deliverCoordinationRequestToCoordinator(operatorId,
serializedRequest, timeout));
}
- /**
- * Cleans up the job related data from the dispatcher. If cleanupHA is
true, then
- * the data will also be removed from HA.
- *
- * @param jobId JobID identifying the job to clean up
- * @param cleanupHA True iff HA data shall also be cleaned up
- */
- private void removeJobAndRegisterTerminationFuture(JobID jobId, boolean
cleanupHA) {
- final CompletableFuture<Void> cleanupFuture = removeJob(jobId,
cleanupHA);
-
- registerJobManagerRunnerTerminationFuture(jobId, cleanupFuture);
- }
-
private void registerJobManagerRunnerTerminationFuture(JobID jobId,
CompletableFuture<Void> jobManagerRunnerTerminationFuture) {
Preconditions.checkState(!jobManagerTerminationFutures.containsKey(jobId));
-
jobManagerTerminationFutures.put(jobId,
jobManagerRunnerTerminationFuture);
// clean up the pending termination future
@@ -697,18 +695,13 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
getMainThreadExecutor());
}
- private CompletableFuture<Void> removeJob(JobID jobId, boolean
cleanupHA) {
- CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
jobManagerRunnerFutures.remove(jobId);
+ private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState
cleanupJobState) {
+ final CompletableFuture<JobManagerRunner> job =
checkNotNull(jobManagerRunnerFutures.remove(jobId));
- final CompletableFuture<Void> jobManagerRunnerTerminationFuture;
- if (jobManagerRunnerFuture != null) {
- jobManagerRunnerTerminationFuture =
jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync);
- } else {
- jobManagerRunnerTerminationFuture =
CompletableFuture.completedFuture(null);
- }
+ final CompletableFuture<Void> jobTerminationFuture =
job.thenCompose(JobManagerRunner::closeAsync);
- return jobManagerRunnerTerminationFuture.thenRunAsync(
- () -> cleanUpJobData(jobId, cleanupHA),
+ return jobTerminationFuture.thenRunAsync(
+ () -> cleanUpJobData(jobId,
cleanupJobState.cleanupHAData),
getRpcService().getExecutor());
}
@@ -751,7 +744,15 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
final HashSet<JobID> jobsToRemove = new
HashSet<>(jobManagerRunnerFutures.keySet());
for (JobID jobId : jobsToRemove) {
- removeJobAndRegisterTerminationFuture(jobId, false);
+ terminateJob(jobId);
+ }
+ }
+
+ private void terminateJob(JobID jobId) {
+ final CompletableFuture<JobManagerRunner>
jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
+
+ if (jobManagerRunnerFuture != null) {
+
jobManagerRunnerFuture.thenCompose(JobManagerRunner::closeAsync);
}
}
@@ -765,7 +766,7 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
fatalErrorHandler.onFatalError(throwable);
}
- protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph
archivedExecutionGraph) {
+ protected CleanupJobState
jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
Preconditions.checkArgument(
archivedExecutionGraph.getState().isGloballyTerminalState(),
"Job %s is in state %s which is not globally terminal.",
@@ -776,9 +777,7 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
archiveExecutionGraph(archivedExecutionGraph);
- final JobID jobId = archivedExecutionGraph.getJobID();
-
- removeJobAndRegisterTerminationFuture(jobId, true);
+ return CleanupJobState.GLOBAL;
}
private void archiveExecutionGraph(ArchivedExecutionGraph
archivedExecutionGraph) {
@@ -806,16 +805,18 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
});
}
- protected void jobNotFinished(JobID jobId) {
+ protected CleanupJobState jobNotFinished(JobID jobId) {
log.info("Job {} was not finished by JobManager.", jobId);
- removeJobAndRegisterTerminationFuture(jobId, false);
+ return CleanupJobState.LOCAL;
}
- private void jobMasterFailed(JobID jobId, Throwable cause) {
+ private CleanupJobState jobMasterFailed(JobID jobId, Throwable cause) {
// we fail fatally in case of a JobMaster failure in order to
restart the
// dispatcher to recover the jobs again. This only works in HA
mode, though
onFatalError(new FlinkException(String.format("JobMaster for
job %s failed.", jobId), cause));
+
+ return CleanupJobState.LOCAL;
}
private CompletableFuture<JobMasterGateway>
getJobMasterGatewayFuture(JobID jobId) {
@@ -900,7 +901,7 @@ public abstract class Dispatcher extends
PermanentlyFencedRpcEndpoint<Dispatcher
public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) {
return CompletableFuture.runAsync(
- () -> removeJobAndRegisterTerminationFuture(jobId,
false),
+ () -> terminateJob(jobId),
getMainThreadExecutor());
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index b06453d..65f71b0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -112,20 +112,24 @@ public class MiniDispatcher extends Dispatcher {
}
@Override
- protected void jobReachedGloballyTerminalState(ArchivedExecutionGraph
archivedExecutionGraph) {
- super.jobReachedGloballyTerminalState(archivedExecutionGraph);
+ protected CleanupJobState
jobReachedGloballyTerminalState(ArchivedExecutionGraph archivedExecutionGraph) {
+ final CleanupJobState cleanupHAState =
super.jobReachedGloballyTerminalState(archivedExecutionGraph);
if (jobCancelled || executionMode ==
ClusterEntrypoint.ExecutionMode.DETACHED) {
// shut down if job is cancelled or we don't have to
wait for the execution result retrieval
shutDownFuture.complete(ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
}
+
+ return cleanupHAState;
}
@Override
- protected void jobNotFinished(JobID jobId) {
- super.jobNotFinished(jobId);
+ protected CleanupJobState jobNotFinished(JobID jobId) {
+ final CleanupJobState cleanupJobState =
super.jobNotFinished(jobId);
// shut down since we have done our job
shutDownFuture.complete(ApplicationStatus.UNKNOWN);
+
+ return cleanupJobState;
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
index 4d750d4..adfafcf 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
@@ -194,6 +194,8 @@ public class JobManagerRunnerImpl implements
LeaderContender, OnCompletionAction
classLoaderLease.release();
+
resultFuture.completeExceptionally(new
JobNotFinishedException(jobGraph.getJobID()));
+
if (throwable != null) {
terminationFuture.completeExceptionally(
new
FlinkException("Could not properly shut down the JobManagerRunner", throwable));
@@ -201,11 +203,6 @@ public class JobManagerRunnerImpl implements
LeaderContender, OnCompletionAction
terminationFuture.complete(null);
}
});
-
- terminationFuture.whenComplete(
- (Void ignored, Throwable throwable) -> {
-
resultFuture.completeExceptionally(new
JobNotFinishedException(jobGraph.getJobID()));
- });
}
return terminationFuture;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
index fdbf165..2792f6e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
@@ -74,7 +74,11 @@ import javax.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.Optional;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -314,6 +318,32 @@ public class DispatcherResourceCleanupTest extends
TestLogger {
assertThat(deleteAllHABlobsFuture.isDone(), is(false));
}
+ @Test
+ public void testHACleanupWhenJobFinishedWhileClosingDispatcher() throws
Exception {
+ final TestingJobManagerRunner testingJobManagerRunner = new
TestingJobManagerRunner(jobId, true);
+
+ final Queue<JobManagerRunner> jobManagerRunners = new
ArrayDeque<>(Arrays.asList(testingJobManagerRunner));
+
+ startDispatcher(new
QueueJobManagerRunnerFactory(jobManagerRunners));
+ submitJob();
+
+ final CompletableFuture<Void> dispatcherTerminationFuture =
dispatcher.closeAsync();
+
+ testingJobManagerRunner.waitUntilCloseAsyncIsBeingCalled();
+ testingJobManagerRunner.completeResultFuture(new
ArchivedExecutionGraphBuilder()
+ .setJobID(jobId)
+ .setState(JobStatus.FINISHED)
+ .build());
+
+ testingJobManagerRunner.completeTerminationFuture();
+
+ // check that no exceptions have been thrown
+ dispatcherTerminationFuture.get();
+
+ assertThat(cleanupJobFuture.get(), is(jobId));
+ assertThat(deleteAllHABlobsFuture.get(), is(jobId));
+ }
+
/**
* Tests that the {@link RunningJobsRegistry} entries are cleared after
the
* job reached a terminal state.
@@ -548,6 +578,28 @@ public class DispatcherResourceCleanupTest extends
TestLogger {
}
}
+ private static final class QueueJobManagerRunnerFactory implements
JobManagerRunnerFactory {
+ private final Queue<? extends JobManagerRunner>
jobManagerRunners;
+
+ private QueueJobManagerRunnerFactory(Queue<? extends
JobManagerRunner> jobManagerRunners) {
+ this.jobManagerRunners = jobManagerRunners;
+ }
+
+ @Override
+ public JobManagerRunner createJobManagerRunner(
+ JobGraph jobGraph,
+ Configuration configuration,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ JobManagerSharedServices jobManagerServices,
+ JobManagerJobMetricGroupFactory
jobManagerJobMetricGroupFactory,
+ FatalErrorHandler fatalErrorHandler) {
+ return Optional.ofNullable(jobManagerRunners.poll())
+ .orElseThrow(() -> new
IllegalStateException("Cannot create more JobManagerRunners."));
+ }
+ }
+
private class FailingJobManagerRunnerFactory implements
JobManagerRunnerFactory {
private final Exception testException;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index 55cfde4..e8ed401 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import java.util.concurrent.CompletableFuture;
@@ -38,6 +39,8 @@ public class TestingJobManagerRunner implements
JobManagerRunner {
private final CompletableFuture<Void> terminationFuture;
+ private final OneShotLatch closeAsyncLatch = new OneShotLatch();
+
public TestingJobManagerRunner(JobID jobId) {
this(jobId, false);
}
@@ -72,6 +75,7 @@ public class TestingJobManagerRunner implements
JobManagerRunner {
@Override
public CompletableFuture<Void> closeAsync() {
+ closeAsyncLatch.trigger();
if (!blockingTermination) {
terminationFuture.complete(null);
}
@@ -91,6 +95,10 @@ public class TestingJobManagerRunner implements
JobManagerRunner {
terminationFuture.complete(null);
}
+ public void waitUntilCloseAsyncIsBeingCalled() throws
InterruptedException {
+ closeAsyncLatch.await();
+ }
+
public CompletableFuture<Void> getTerminationFuture() {
return terminationFuture;
}