XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1293129190
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
@GuardedBy("lock")
private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
- sequentialOperation =
- sequentialOperation.thenRun(
- () ->
- runIfValidLeader(
- leaderSessionId,
- ThrowingRunnable.unchecked(
- () ->
-
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-
leaderSessionId)),
- "verify job scheduling status and
create JobMasterServiceProcess"));
-
+ boolean isValid;
+ synchronized (lock) {
+ isValid = isValidLeader(leaderSessionId);
+ }
+ if (isValid) {
Review Comment:
```suggestion
```
This part is obsolete and can be removed: You see that by checking the call
hierarchy of this method. It's used by `grantLeadership` and only called if in
running state (and the leadership is granted).
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
@GuardedBy("lock")
private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
- sequentialOperation =
- sequentialOperation.thenRun(
- () ->
- runIfValidLeader(
- leaderSessionId,
- ThrowingRunnable.unchecked(
- () ->
-
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-
leaderSessionId)),
- "verify job scheduling status and
create JobMasterServiceProcess"));
-
+ boolean isValid;
+ synchronized (lock) {
+ isValid = isValidLeader(leaderSessionId);
+ }
+ if (isValid) {
+ sequentialOperation =
+ sequentialOperation.thenCompose(
+ unused ->
+ runIfValidLeader(
+ leaderSessionId,
+ () ->
+
jobResultStore.hasJobResultEntryAsync(
+
getJobID()),
+ "verify jbb result entry")
+ .handle(
+ (hasJobResult, throwable)
-> {
+ if (hasJobResult ==
null) {
Review Comment:
This would swallow a potential IOException in the `hasJobResultEntryAsync`
call. The old implementation handled these kind of errors. PTAL
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
@GuardedBy("lock")
private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
- sequentialOperation =
- sequentialOperation.thenRun(
- () ->
- runIfValidLeader(
- leaderSessionId,
- ThrowingRunnable.unchecked(
- () ->
-
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-
leaderSessionId)),
- "verify job scheduling status and
create JobMasterServiceProcess"));
-
+ boolean isValid;
+ synchronized (lock) {
+ isValid = isValidLeader(leaderSessionId);
+ }
+ if (isValid) {
+ sequentialOperation =
+ sequentialOperation.thenCompose(
+ unused ->
+ runIfValidLeader(
+ leaderSessionId,
+ () ->
+
jobResultStore.hasJobResultEntryAsync(
+
getJobID()),
+ "verify jbb result entry")
+ .handle(
+ (hasJobResult, throwable)
-> {
+ if (hasJobResult ==
null) {
+ return null;
+ }
+ if (hasJobResult) {
+ runIfValidLeader(
+
leaderSessionId,
+ () -> {
+
jobAlreadyDone(
+
leaderSessionId);
+ return
CompletableFuture
+
.completedFuture(
+
null);
+ },
+ "check
completed job");
+ } else {
+ runIfValidLeader(
+
leaderSessionId,
+ () -> {
+
ThrowingRunnable.unchecked(
+
() ->
+
createNewJobMasterServiceProcess(
+
leaderSessionId))
+
.run();
+ return
CompletableFuture
+
.completedFuture(
+
null);
+ },
+ "create
new job master service process");
+ }
+ return null;
+ }));
+ } else {
+ LOG.trace(
+ "Ignore leader action '{}' because the leadership runner
is no longer the valid leader for {}.",
+ "verify job scheduling status and create
JobMasterServiceProcess",
+ leaderSessionId);
+ }
handleAsyncOperationError(sequentialOperation, "Could not start the
job manager.");
}
@GuardedBy("lock")
private void
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
Review Comment:
That method is not used anymore.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
@GuardedBy("lock")
private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
- sequentialOperation =
- sequentialOperation.thenRun(
- () ->
- runIfValidLeader(
- leaderSessionId,
- ThrowingRunnable.unchecked(
- () ->
-
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-
leaderSessionId)),
- "verify job scheduling status and
create JobMasterServiceProcess"));
-
+ boolean isValid;
+ synchronized (lock) {
+ isValid = isValidLeader(leaderSessionId);
+ }
+ if (isValid) {
+ sequentialOperation =
+ sequentialOperation.thenCompose(
+ unused ->
+ runIfValidLeader(
+ leaderSessionId,
+ () ->
+
jobResultStore.hasJobResultEntryAsync(
+
getJobID()),
+ "verify jbb result entry")
+ .handle(
Review Comment:
That works. But you could also call the `handle` on the result of the
`thenCompose` call. I hope that this would improve readability because we would
not have so many indentations. Another way to fix these indentation/readability
issues is to move the async code into its own private method with a meaningful
name.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
@GuardedBy("lock")
private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
- sequentialOperation =
- sequentialOperation.thenRun(
- () ->
- runIfValidLeader(
- leaderSessionId,
- ThrowingRunnable.unchecked(
- () ->
-
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-
leaderSessionId)),
- "verify job scheduling status and
create JobMasterServiceProcess"));
-
+ boolean isValid;
+ synchronized (lock) {
+ isValid = isValidLeader(leaderSessionId);
+ }
+ if (isValid) {
+ sequentialOperation =
+ sequentialOperation.thenCompose(
+ unused ->
+ runIfValidLeader(
+ leaderSessionId,
+ () ->
+
jobResultStore.hasJobResultEntryAsync(
+
getJobID()),
+ "verify jbb result entry")
Review Comment:
```suggestion
"verify job result
entry")
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -488,18 +523,21 @@ private boolean isRunning() {
return state == State.RUNNING;
}
- private void runIfValidLeader(
- UUID expectedLeaderId, Runnable action, String actionDescription) {
+ private <T> CompletableFuture<T> runIfValidLeader(
Review Comment:
nit: This method should be called `supplyIfValidLeader`. But with your
current implementation, a dedicated `runIfValidLeader` would be still helpful.
Hint: SupplierAsync-based and Runnable-based helper methods can be translated
to each other in the following way:
```
private <T> CompletableFuture<T> supplyAsyncIfValidLeader(
UUID expectedLeaderId,
Supplier<CompletableFuture<T>> supplier,
Supplier<CompletableFuture<T>> noLeaderFallback) {
final CompletableFuture<T> resultFuture = new CompletableFuture<>();
runIfValidLeader(
expectedLeaderId,
() -> FutureUtils.forward(supplier.get(), resultFuture),
() -> FutureUtils.forward(noLeaderFallback.get(),
resultFuture));
return resultFuture;
}
private void runIfValidLeader(
UUID expectedLeaderId, Runnable action, Runnable
noLeaderFallback) {
synchronized (lock) {
if (isValidLeader(expectedLeaderId)) {
action.run();
} else {
noLeaderFallback.run();
}
}
}
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
@GuardedBy("lock")
private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
- sequentialOperation =
- sequentialOperation.thenRun(
- () ->
- runIfValidLeader(
- leaderSessionId,
- ThrowingRunnable.unchecked(
- () ->
-
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-
leaderSessionId)),
- "verify job scheduling status and
create JobMasterServiceProcess"));
-
+ boolean isValid;
+ synchronized (lock) {
+ isValid = isValidLeader(leaderSessionId);
+ }
+ if (isValid) {
+ sequentialOperation =
+ sequentialOperation.thenCompose(
+ unused ->
+ runIfValidLeader(
+ leaderSessionId,
+ () ->
+
jobResultStore.hasJobResultEntryAsync(
+
getJobID()),
+ "verify jbb result entry")
+ .handle(
+ (hasJobResult, throwable)
-> {
+ if (hasJobResult ==
null) {
+ return null;
+ }
+ if (hasJobResult) {
+ runIfValidLeader(
Review Comment:
I guess we could remove code redundancy here by moving the
`runIfValidLeader` out of the if clause. Alternatively, you could move the
content of the two if/else blocks into their own private method with meaningful
names (e.g. `handleJobAlreadyDoneIfValidLeader(leaderSessionID)` and
`createNewJobMasterServiceProcessIfValidLeader(leaderSessionID)`) to improve
this part of the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]