TisonKun commented on a change in pull request #9832: [FLINK-11843] Bind
lifespan of Dispatcher to leader session
URL: https://github.com/apache/flink/pull/9832#discussion_r337549852
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -869,78 +823,6 @@ private void jobMasterFailed(JobID jobId, Throwable
cause) {
return optionalJobInformation;
}
- //------------------------------------------------------
- // Leader contender
- //------------------------------------------------------
-
- /**
- * Callback method when current resourceManager is granted leadership.
- *
- * @param newLeaderSessionID unique leadershipID
- */
- @Override
- public void grantLeadership(final UUID newLeaderSessionID) {
- runAsyncWithoutFencing(
- () -> {
- log.info("Dispatcher {} was granted leadership
with fencing token {}", getAddress(), newLeaderSessionID);
-
- final CompletableFuture<Collection<JobGraph>>
recoveredJobsFuture = recoveryOperation.thenApplyAsync(
- FunctionUtils.uncheckedFunction(ignored
-> recoverJobs()),
- getRpcService().getExecutor());
-
- final CompletableFuture<Boolean>
fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
- (Collection<JobGraph> recoveredJobs) ->
tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
- getUnfencedMainThreadExecutor());
-
- final CompletableFuture<Void>
confirmationFuture = fencingTokenFuture.thenCombineAsync(
- recoveredJobsFuture,
-
BiFunctionWithException.unchecked((Boolean confirmLeadership,
Collection<JobGraph> recoveredJobs) -> {
- if (confirmLeadership) {
-
leaderElectionService.confirmLeadership(newLeaderSessionID, getAddress());
- } else {
- for (JobGraph
recoveredJob : recoveredJobs) {
-
jobGraphStore.releaseJobGraph(recoveredJob.getJobID());
- }
- }
- return null;
- }),
- getRpcService().getExecutor());
-
- confirmationFuture.whenComplete(
- (Void ignored, Throwable throwable) -> {
- if (throwable != null) {
- onFatalError(
- new
DispatcherException(
-
String.format("Failed to take leadership with session id %s.",
newLeaderSessionID),
-
(ExceptionUtils.stripCompletionException(throwable))));
- }
- });
-
- recoveryOperation = confirmationFuture;
- });
- }
-
- private CompletableFuture<Boolean> tryAcceptLeadershipAndRunJobs(UUID
newLeaderSessionID, Collection<JobGraph> recoveredJobs) {
- final DispatcherId dispatcherId =
DispatcherId.fromUuid(newLeaderSessionID);
-
- if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
- log.debug("Dispatcher {} accepted leadership with
fencing token {}. Start recovered jobs.", getAddress(), dispatcherId);
- setNewFencingToken(dispatcherId);
-
- Collection<CompletableFuture<?>> runFutures = new
ArrayList<>(recoveredJobs.size());
-
- for (JobGraph recoveredJob : recoveredJobs) {
- final CompletableFuture<?> runFuture =
waitForTerminatingJobManager(recoveredJob.getJobID(), recoveredJob,
this::runJob);
- runFutures.add(runFuture);
- }
-
- return
FutureUtils.waitForAll(runFutures).thenApply(ignored -> true);
- } else {
- log.debug("Dispatcher {} lost leadership before
accepting it. Stop recovering jobs for fencing token {}.", getAddress(),
dispatcherId);
- return CompletableFuture.completedFuture(false);
- }
- }
-
private CompletableFuture<Void> waitForTerminatingJobManager(JobID
jobId, JobGraph jobGraph, FunctionWithException<JobGraph,
CompletableFuture<Void>, ?> action) {
Review comment:
Make sense to defer the change since it is not strongly required by this
issue.
Conceptually we don't have to "wait for previous terminating job manager"
because since Dispatcher spawned per leader session, if a previous terminating
job manager exists, the job must be finished(successfully or unsuccessfully),
which means we should reject the later submission.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services