XComp commented on code in PR #22341:
URL: https://github.com/apache/flink/pull/22341#discussion_r1295642610
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -262,36 +261,92 @@ public void grantLeadership(UUID leaderSessionID) {
@GuardedBy("lock")
private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
sequentialOperation =
- sequentialOperation.thenRun(
- () ->
- runIfValidLeader(
- leaderSessionId,
- ThrowingRunnable.unchecked(
+ sequentialOperation
+ .thenCompose(
+ unused ->
+ supplyAsyncIfValidLeader(
+ leaderSessionId,
() ->
-
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-
leaderSessionId)),
- "verify job scheduling status and
create JobMasterServiceProcess"));
-
+
jobResultStore.hasJobResultEntryAsync(
+ getJobID()),
+ () ->
+
printLogIfNotValidLeader(
+ "verify job
result entry",
+
leaderSessionId)))
+ .handle(
+ (hasJobResult, throwable) -> {
+ if (throwable != null) {
+ ExceptionUtils.rethrow(throwable);
+ }
+ if (hasJobResult == null) {
+ return null;
+ }
+ if (hasJobResult) {
+
handleJobAlreadyDoneIfValidLeader(leaderSessionId);
+ } else {
+
createNewJobMasterServiceProcessIfValidLeader(
+ leaderSessionId);
+ }
+ return null;
+ });
handleAsyncOperationError(sequentialOperation, "Could not start the
job manager.");
}
- @GuardedBy("lock")
- private void
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
- throws FlinkException {
- try {
- if (jobResultStore.hasJobResultEntry(getJobID())) {
- jobAlreadyDone(leaderSessionId);
+ private <T> CompletableFuture<T> supplyAsyncIfValidLeader(
+ UUID expectedLeaderId,
+ Supplier<CompletableFuture<T>> supplier,
+ Supplier<CompletableFuture<T>> noLeaderFallback) {
+ final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+ runIfValidLeader(
+ expectedLeaderId,
+ () -> FutureUtils.forward(supplier.get(), resultFuture),
+ () -> FutureUtils.forward(noLeaderFallback.get(),
resultFuture));
+
+ return resultFuture;
+ }
+
+ private void handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) {
+ runIfValidLeader(
+ leaderSessionId,
+ () -> {
+ jobAlreadyDone(leaderSessionId);
+ },
+ () -> printLogIfNotValidLeader("check completed job",
leaderSessionId));
+ }
+
+ private void createNewJobMasterServiceProcessIfValidLeader(UUID
leaderSessionId) {
+ runIfValidLeader(
+ leaderSessionId,
+ () -> {
+ ThrowingRunnable.unchecked(
+ () ->
createNewJobMasterServiceProcess(leaderSessionId))
+ .run();
+ },
+ () ->
+ printLogIfNotValidLeader(
+ "create new job master service process",
leaderSessionId));
+ }
+
+ private void runIfValidLeader(
Review Comment:
Moving the `runIfValidLeader` method up destroys the git history to some
degree (imagine someone wants to see the history of this specific method in
Intellij by selecting the method and using the "Git > Show history for
Selection..." feature). Instead, why not refactor the method in its original
location and add the `supplyAsyncIfValidLeader` next to that one.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -262,36 +261,92 @@ public void grantLeadership(UUID leaderSessionID) {
@GuardedBy("lock")
private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
sequentialOperation =
- sequentialOperation.thenRun(
- () ->
- runIfValidLeader(
- leaderSessionId,
- ThrowingRunnable.unchecked(
+ sequentialOperation
+ .thenCompose(
+ unused ->
+ supplyAsyncIfValidLeader(
+ leaderSessionId,
() ->
-
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-
leaderSessionId)),
- "verify job scheduling status and
create JobMasterServiceProcess"));
-
+
jobResultStore.hasJobResultEntryAsync(
+ getJobID()),
+ () ->
+
printLogIfNotValidLeader(
+ "verify job
result entry",
+
leaderSessionId)))
+ .handle(
+ (hasJobResult, throwable) -> {
+ if (throwable != null) {
+ ExceptionUtils.rethrow(throwable);
+ }
+ if (hasJobResult == null) {
+ return null;
+ }
+ if (hasJobResult) {
+
handleJobAlreadyDoneIfValidLeader(leaderSessionId);
+ } else {
+
createNewJobMasterServiceProcessIfValidLeader(
+ leaderSessionId);
+ }
+ return null;
+ });
handleAsyncOperationError(sequentialOperation, "Could not start the
job manager.");
}
- @GuardedBy("lock")
- private void
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
- throws FlinkException {
- try {
- if (jobResultStore.hasJobResultEntry(getJobID())) {
- jobAlreadyDone(leaderSessionId);
+ private <T> CompletableFuture<T> supplyAsyncIfValidLeader(
+ UUID expectedLeaderId,
+ Supplier<CompletableFuture<T>> supplier,
+ Supplier<CompletableFuture<T>> noLeaderFallback) {
+ final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+ runIfValidLeader(
+ expectedLeaderId,
+ () -> FutureUtils.forward(supplier.get(), resultFuture),
+ () -> FutureUtils.forward(noLeaderFallback.get(),
resultFuture));
+
+ return resultFuture;
+ }
+
+ private void handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) {
+ runIfValidLeader(
+ leaderSessionId,
+ () -> {
+ jobAlreadyDone(leaderSessionId);
+ },
+ () -> printLogIfNotValidLeader("check completed job",
leaderSessionId));
+ }
+
+ private void createNewJobMasterServiceProcessIfValidLeader(UUID
leaderSessionId) {
+ runIfValidLeader(
+ leaderSessionId,
+ () -> {
+ ThrowingRunnable.unchecked(
+ () ->
createNewJobMasterServiceProcess(leaderSessionId))
+ .run();
+ },
+ () ->
+ printLogIfNotValidLeader(
+ "create new job master service process",
leaderSessionId));
+ }
+
+ private void runIfValidLeader(
+ UUID expectedLeaderId, Runnable action, Runnable noLeaderFallback)
{
+ synchronized (lock) {
+ if (isValidLeader(expectedLeaderId)) {
+ action.run();
} else {
- createNewJobMasterServiceProcess(leaderSessionId);
+ noLeaderFallback.run();
}
- } catch (IOException e) {
- throw new FlinkException(
- String.format(
- "Could not retrieve the job scheduling status for
job %s.", getJobID()),
- e);
}
}
+ private CompletableFuture<Boolean> printLogIfNotValidLeader(
+ String actionDescription, UUID leaderSessionId) {
+ LOG.trace(
Review Comment:
a bit out-of-scope, but: I feel like these log statement should be in debug
level. :thinking: WDYT? But we should fix it in a separate hotfix commit if you
agree.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -261,34 +261,69 @@ public void grantLeadership(UUID leaderSessionID) {
@GuardedBy("lock")
private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
- sequentialOperation =
- sequentialOperation.thenRun(
- () ->
- runIfValidLeader(
- leaderSessionId,
- ThrowingRunnable.unchecked(
- () ->
-
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
-
leaderSessionId)),
- "verify job scheduling status and
create JobMasterServiceProcess"));
-
+ boolean isValid;
+ synchronized (lock) {
+ isValid = isValidLeader(leaderSessionId);
+ }
+ if (isValid) {
+ sequentialOperation =
+ sequentialOperation.thenCompose(
+ unused ->
+ runIfValidLeader(
+ leaderSessionId,
+ () ->
+
jobResultStore.hasJobResultEntryAsync(
+
getJobID()),
+ "verify jbb result entry")
+ .handle(
+ (hasJobResult, throwable)
-> {
+ if (hasJobResult ==
null) {
Review Comment:
~The if (hasJobResult == null) { block becomes obsolete here (because there
shouldn't be a situation where no result is passed if no error happened in
upstream async calls for handle(BiFunction). You could add a
Precondition.checkNotNull if you like. But the subsequent if call would cause a
NullPointerException anyway which makes the precondition obsolete again.~
~(... and a side-remark here: we would at least add logs rather than only
returning null in case if the if (hasJobResult == null) { case.)~
Ok, I misread the code here. The `null` check actually has a meaning here
because the upstream call of `supplyAsyncIfValidLeader` call returns a
`CompletableFuture` that completed successfully with `null` if the leadership
is not granted anymore (the `noLeaderFallback`). This functional dependency is
a bit hidden (IMHO). I see two options to improve the code:
1. Add a comment to the `if (hasJobResult == null) {` explaining why this is
needed. I would also like to add a `@Nullable` annotation to
`printLogIfNotValidLeader` to acknowlegde. But I don't know how to handle such
a case with async calls. The `@Nullable` annotation becomes kind of ambigious.
2. Consider not having the leadership in the `supplyAsyncIfValidLeader` an
error scenario that shall be handled in the downstream `handle` call. This we
can achieve by returning an exceptionally completed future. You might want to
introduce a new exception `LeadershipLostException` that derives from
`LeaderElectionException` to cover that error scenario. You could add the log
message in the `handle` method when handling `throwable != null`, something
like:
```
if (throwable instanceof LeadershipLostException) {
// print your log statement
} else if (throwable != null) {
ExceptionUtils.rethrow(throwable);
}
```
I'm in favor of the 2nd solution because it makes the code more explicit
and, in this regard, easier to understand.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]