tillrohrmann commented on a change in pull request #7565: [FLINK-11400]
Linearize leadership operations in JobManagerRunner
URL: https://github.com/apache/flink/pull/7565#discussion_r251455684
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
##########
@@ -277,38 +285,72 @@ public void grantLeadership(final UUID leaderSessionID) {
return;
}
- try {
-
verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
- } catch (Exception e) {
- handleJobManagerRunnerError(e);
- }
+ leadershipOperation = leadershipOperation.thenCompose(
+ (ignored) -> {
+ synchronized (lock) {
+ return
verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
+ }
+ });
+
+ handleException(leadershipOperation, "Could not start
the job manager.");
}
}
- private void verifyJobSchedulingStatusAndStartJobManager(UUID
leaderSessionId) throws Exception {
- final JobSchedulingStatus jobSchedulingStatus =
runningJobsRegistry.getJobSchedulingStatus(jobGraph.getJobID());
+ private CompletableFuture<Void>
verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
+ final CompletableFuture<JobSchedulingStatus>
jobSchedulingStatusFuture = getJobSchedulingStatus();
+
+ return jobSchedulingStatusFuture.thenCompose(
+ jobSchedulingStatus -> {
+ if (jobSchedulingStatus ==
JobSchedulingStatus.DONE) {
+ return jobAlreadyDone();
+ } else {
+ return startJobManager(leaderSessionId);
+ }
+ });
+ }
- if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
- log.info("Granted leader ship but job {} has been
finished. ", jobGraph.getJobID());
- jobFinishedByOther();
- } else {
- log.info("JobManager runner for job {} ({}) was granted
leadership with session id {} at {}.",
- jobGraph.getName(), jobGraph.getJobID(),
leaderSessionId, getAddress());
+ private CompletionStage<Void> startJobManager(UUID leaderSessionId) {
Review comment:
good idea. Will change it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services