XComp commented on code in PR #28201:
URL: https://github.com/apache/flink/pull/28201#discussion_r3371877747
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -241,29 +254,83 @@ public boolean isInitialized() {
@Override
public void grantLeadership(UUID leaderSessionID) {
- runIfStateRunning(
- () -> startJobMasterServiceProcessAsync(leaderSessionID),
- "starting a new JobMasterServiceProcess");
+ synchronized (lock) {
Review Comment:
why are you inlining the `runIfStateRunning` method here instead of keeping
it? We just have to edit the callback that's passed in, don't we?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -355,17 +426,48 @@ private void confirmLeadership(
private void forwardResultFuture(
UUID leaderSessionId, CompletableFuture<JobManagerRunnerResult>
resultFuture) {
resultFuture.whenComplete(
- (jobManagerRunnerResult, throwable) ->
- runIfValidLeader(
- leaderSessionId,
- () -> onJobCompletion(jobManagerRunnerResult,
throwable),
- "result future forwarding"));
+ (jobManagerRunnerResult, throwable) -> {
+ rememberGloballyTerminalResultIfCurrentProcess(
+ leaderSessionId, jobManagerRunnerResult);
+ runIfValidLeader(
+ leaderSessionId,
+ () -> onJobCompletion(jobManagerRunnerResult,
throwable),
+ "result future forwarding");
+ });
+ }
+
+ private void rememberGloballyTerminalResultIfCurrentProcess(
Review Comment:
```suggestion
private void cacheGloballyTerminalResultIfCurrentProcess(
```
nit: maybe, stick to one term here.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -241,29 +254,83 @@ public boolean isInitialized() {
@Override
public void grantLeadership(UUID leaderSessionID) {
- runIfStateRunning(
- () -> startJobMasterServiceProcessAsync(leaderSessionID),
- "starting a new JobMasterServiceProcess");
+ synchronized (lock) {
+ if (!isRunning()) {
+ LOG.debug(
+ "Ignore 'starting a new JobMasterServiceProcess'
because the leadership runner is no longer running.");
+ return;
+ }
+ sequentialOperation =
+ sequentialOperation.thenCompose(
+ unused ->
flushPendingOrStartNewProcessAsync(leaderSessionID));
+ handleAsyncOperationError(sequentialOperation, "Could not start
the job manager.");
+ }
+ }
+
+ private CompletableFuture<Void> flushPendingOrStartNewProcessAsync(UUID
leaderSessionId) {
Review Comment:
```suggestion
private CompletableFuture<Void>
handledCachedGloballyTerminalJobOrStartNewProcessAsync(UUID leaderSessionId) {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -355,17 +426,48 @@ private void confirmLeadership(
private void forwardResultFuture(
UUID leaderSessionId, CompletableFuture<JobManagerRunnerResult>
resultFuture) {
resultFuture.whenComplete(
- (jobManagerRunnerResult, throwable) ->
- runIfValidLeader(
- leaderSessionId,
- () -> onJobCompletion(jobManagerRunnerResult,
throwable),
- "result future forwarding"));
+ (jobManagerRunnerResult, throwable) -> {
+ rememberGloballyTerminalResultIfCurrentProcess(
Review Comment:
```suggestion
// The JobManagerResult needs to be cached for the
current process even if the leadership was lost
rememberGloballyTerminalResultIfCurrentProcess(
```
we should make it explicit why we're not guarding this method call with via
leader election grant
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -355,17 +426,48 @@ private void confirmLeadership(
private void forwardResultFuture(
UUID leaderSessionId, CompletableFuture<JobManagerRunnerResult>
resultFuture) {
resultFuture.whenComplete(
- (jobManagerRunnerResult, throwable) ->
- runIfValidLeader(
- leaderSessionId,
- () -> onJobCompletion(jobManagerRunnerResult,
throwable),
- "result future forwarding"));
+ (jobManagerRunnerResult, throwable) -> {
+ rememberGloballyTerminalResultIfCurrentProcess(
+ leaderSessionId, jobManagerRunnerResult);
+ runIfValidLeader(
+ leaderSessionId,
+ () -> onJobCompletion(jobManagerRunnerResult,
throwable),
+ "result future forwarding");
+ });
+ }
+
+ private void rememberGloballyTerminalResultIfCurrentProcess(
Review Comment:
```suggestion
private void cacheGloballyTerminalResultIfCurrentProcess(
```
nit: maybe, stick to one term here.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java:
##########
@@ -246,17 +249,26 @@ void
testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializationErro
@Nonnull
private ExecutionGraphInfo createFailedExecutionGraphInfo(FlinkException
testException) {
+ return createExecutionGraphInfo(JobStatus.FAILED, testException);
+ }
+
+ @Nonnull
Review Comment:
nit: Let's remove the @Nonnull annotations instead of introducing a new one.
The non-null case should be the default one.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -241,29 +254,83 @@ public boolean isInitialized() {
@Override
public void grantLeadership(UUID leaderSessionID) {
- runIfStateRunning(
- () -> startJobMasterServiceProcessAsync(leaderSessionID),
- "starting a new JobMasterServiceProcess");
+ synchronized (lock) {
+ if (!isRunning()) {
+ LOG.debug(
+ "Ignore 'starting a new JobMasterServiceProcess'
because the leadership runner is no longer running.");
+ return;
+ }
+ sequentialOperation =
+ sequentialOperation.thenCompose(
+ unused ->
flushPendingOrStartNewProcessAsync(leaderSessionID));
+ handleAsyncOperationError(sequentialOperation, "Could not start
the job manager.");
+ }
+ }
+
+ private CompletableFuture<Void> flushPendingOrStartNewProcessAsync(UUID
leaderSessionId) {
+ final JobManagerRunnerResult cachedTerminalResult;
+ synchronized (lock) {
+ if (!isRunning()) {
+ return FutureUtils.completedVoidFuture();
+ }
+ cachedTerminalResult = takePendingTerminalResult();
+ if (cachedTerminalResult != null) {
+ state = State.JOB_COMPLETED;
+ }
+ }
+
+ if (cachedTerminalResult != null) {
+ LOG.info(
+ "Flushing previously observed globally terminal result for
job {} on re-grant; not starting a new {}. Job state: {}.",
+ getJobID(),
+ JobMasterServiceProcess.class.getSimpleName(),
+ cachedTerminalResult
+ .getExecutionGraphInfo()
+ .getArchivedExecutionGraph()
+ .getState());
+ resultFuture.complete(cachedTerminalResult);
+ return FutureUtils.completedVoidFuture();
+ }
+
+ return jobResultStore
+ .hasJobResultEntryAsync(getJobID())
+ .thenCompose(
+ hasJobResult ->
+ hasJobResult
+ ?
handleJobAlreadyDoneIfValidLeader(leaderSessionId)
+ :
createNewJobMasterServiceProcessIfValidLeader(
+ leaderSessionId));
}
@GuardedBy("lock")
- private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
- sequentialOperation =
- sequentialOperation.thenCompose(
- unused ->
- jobResultStore
- .hasJobResultEntryAsync(getJobID())
- .thenCompose(
- hasJobResult -> {
- if (hasJobResult) {
- return
handleJobAlreadyDoneIfValidLeader(
-
leaderSessionId);
- } else {
- return
createNewJobMasterServiceProcessIfValidLeader(
-
leaderSessionId);
- }
- }));
- handleAsyncOperationError(sequentialOperation, "Could not start the
job manager.");
+ private JobManagerRunnerResult takePendingTerminalResult() {
+ final JobManagerRunnerResult terminalResult = pendingTerminalResult;
+ pendingTerminalResult = null;
+ if (terminalResult != null) {
+ currentJobMasterServiceProcessLeaderId = null;
+ }
+ return terminalResult;
+ }
+
+ private void completeResultFutureAfterClose() {
+ JobManagerRunnerResult closeResult;
+ synchronized (lock) {
+ closeResult = takePendingTerminalResult();
+ if (closeResult == null) {
+ closeResult =
+ JobManagerRunnerResult.forSuccess(
+
createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED));
+ currentJobMasterServiceProcessLeaderId = null;
Review Comment:
we reset `currentJobMasterServiceProcessLeaderId` here if `closeResult` is
null (i.e. no terminal result is cached) and in `takePendingTerminalResult` if
there is a terminal result cached. Can we remove the reset in
`takePendingTerminalResult` and move the reset in this cache out of the if
block?
That makes `takePendingTerminalResult` a single-purpose method and makes the
code easier.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -62,13 +62,21 @@
* <p>All leadership operations are serialized. This means that granting the
leadership has to
* complete before the leadership can be revoked and vice versa.
*
- * <p>The {@link #resultFuture} can be completed with the following values: * *
+ * <p>The {@link #resultFuture} can be completed with the following values:
*
* <ul>
* <li>{@link JobManagerRunnerResult} to signal an initialization failure of
the {@link
- * JobMasterService} or the completion of a job
+ * JobMasterService}, the completion of a job, or a globally terminal
result observed before
+ * leadership revocation could be forwarded
* <li>{@link Exception} to signal an unexpected failure
* </ul>
+ *
+ * <p>To close the race between a globally terminal result and a leadership
revocation that strips
+ * the forwarded result (see FLINK-39704), terminal results are cached in
{@link
+ * #pendingTerminalResult} the moment they are observed. The cache is
populated by {@link
+ * #rememberGloballyTerminalResultIfCurrentProcess}, drained by either {@link
#grantLeadership} (on
+ * re-grant) or {@link #completeResultFutureAfterClose} (on close), and
cleared by {@link
+ * #onJobCompletion} when forwarding succeeds normally.
Review Comment:
I feel like this paragraph shouldn't live in the classes' JavaDoc. It's kind
of an internal detail. Can we move the comment next to the newly added fields
instead? WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -241,29 +254,83 @@ public boolean isInitialized() {
@Override
public void grantLeadership(UUID leaderSessionID) {
- runIfStateRunning(
- () -> startJobMasterServiceProcessAsync(leaderSessionID),
- "starting a new JobMasterServiceProcess");
+ synchronized (lock) {
+ if (!isRunning()) {
+ LOG.debug(
+ "Ignore 'starting a new JobMasterServiceProcess'
because the leadership runner is no longer running.");
+ return;
+ }
+ sequentialOperation =
+ sequentialOperation.thenCompose(
+ unused ->
flushPendingOrStartNewProcessAsync(leaderSessionID));
+ handleAsyncOperationError(sequentialOperation, "Could not start
the job manager.");
+ }
+ }
+
+ private CompletableFuture<Void> flushPendingOrStartNewProcessAsync(UUID
leaderSessionId) {
+ final JobManagerRunnerResult cachedTerminalResult;
+ synchronized (lock) {
+ if (!isRunning()) {
+ return FutureUtils.completedVoidFuture();
+ }
+ cachedTerminalResult = takePendingTerminalResult();
+ if (cachedTerminalResult != null) {
+ state = State.JOB_COMPLETED;
+ }
+ }
+
+ if (cachedTerminalResult != null) {
+ LOG.info(
+ "Flushing previously observed globally terminal result for
job {} on re-grant; not starting a new {}. Job state: {}.",
+ getJobID(),
+ JobMasterServiceProcess.class.getSimpleName(),
+ cachedTerminalResult
+ .getExecutionGraphInfo()
+ .getArchivedExecutionGraph()
+ .getState());
+ resultFuture.complete(cachedTerminalResult);
+ return FutureUtils.completedVoidFuture();
+ }
+
+ return jobResultStore
+ .hasJobResultEntryAsync(getJobID())
+ .thenCompose(
+ hasJobResult ->
+ hasJobResult
+ ?
handleJobAlreadyDoneIfValidLeader(leaderSessionId)
+ :
createNewJobMasterServiceProcessIfValidLeader(
+ leaderSessionId));
}
@GuardedBy("lock")
- private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
- sequentialOperation =
- sequentialOperation.thenCompose(
- unused ->
- jobResultStore
- .hasJobResultEntryAsync(getJobID())
- .thenCompose(
- hasJobResult -> {
- if (hasJobResult) {
- return
handleJobAlreadyDoneIfValidLeader(
-
leaderSessionId);
- } else {
- return
createNewJobMasterServiceProcessIfValidLeader(
-
leaderSessionId);
- }
- }));
- handleAsyncOperationError(sequentialOperation, "Could not start the
job manager.");
+ private JobManagerRunnerResult takePendingTerminalResult() {
+ final JobManagerRunnerResult terminalResult = pendingTerminalResult;
+ pendingTerminalResult = null;
+ if (terminalResult != null) {
+ currentJobMasterServiceProcessLeaderId = null;
+ }
+ return terminalResult;
+ }
+
+ private void completeResultFutureAfterClose() {
+ JobManagerRunnerResult closeResult;
+ synchronized (lock) {
+ closeResult = takePendingTerminalResult();
+ if (closeResult == null) {
+ closeResult =
+ JobManagerRunnerResult.forSuccess(
+
createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED));
+ currentJobMasterServiceProcessLeaderId = null;
Review Comment:
```suggestion
// resetting leader ID to handle concurrent
rememberGloballyTerminalResultIfCurrentProcess call from updating the
JobManagerResult
currentJobMasterServiceProcessLeaderId = null;
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java:
##########
@@ -268,6 +268,34 @@ void testSuccessOnTerminalState() {
== JobStatus.FINISHED);
}
+ @Test
Review Comment:
test coverage: We lack a test which verifies that we do not update the
result if the leaderSessionId is wrong (essentially, the else branch of [this
if
condition](https://github.com/apache/flink/blob/3b56b6c4afaecf81e6130226bd0e54030656a627/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java#L445)).
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -355,17 +426,48 @@ private void confirmLeadership(
private void forwardResultFuture(
UUID leaderSessionId, CompletableFuture<JobManagerRunnerResult>
resultFuture) {
resultFuture.whenComplete(
- (jobManagerRunnerResult, throwable) ->
- runIfValidLeader(
- leaderSessionId,
- () -> onJobCompletion(jobManagerRunnerResult,
throwable),
- "result future forwarding"));
+ (jobManagerRunnerResult, throwable) -> {
+ rememberGloballyTerminalResultIfCurrentProcess(
+ leaderSessionId, jobManagerRunnerResult);
+ runIfValidLeader(
+ leaderSessionId,
+ () -> onJobCompletion(jobManagerRunnerResult,
throwable),
+ "result future forwarding");
+ });
+ }
+
+ private void rememberGloballyTerminalResultIfCurrentProcess(
+ UUID leaderSessionId, JobManagerRunnerResult
jobManagerRunnerResult) {
+ synchronized (lock) {
+ if (resultFuture.isDone()) {
+ return;
+ }
+ if (leaderSessionId.equals(currentJobMasterServiceProcessLeaderId)
+ && isGloballyTerminalResult(jobManagerRunnerResult)) {
Review Comment:
```suggestion
// initialization failures should still result in the
job being suspended - no need to set cache JobManagerRunnerResult to trigger
job startup retry after failover
&& isGloballyTerminalResult(jobManagerRunnerResult)) {
```
it's probably worth adding some context on the why here.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java:
##########
@@ -241,29 +254,83 @@ public boolean isInitialized() {
@Override
public void grantLeadership(UUID leaderSessionID) {
- runIfStateRunning(
- () -> startJobMasterServiceProcessAsync(leaderSessionID),
- "starting a new JobMasterServiceProcess");
+ synchronized (lock) {
+ if (!isRunning()) {
+ LOG.debug(
+ "Ignore 'starting a new JobMasterServiceProcess'
because the leadership runner is no longer running.");
+ return;
+ }
+ sequentialOperation =
+ sequentialOperation.thenCompose(
+ unused ->
flushPendingOrStartNewProcessAsync(leaderSessionID));
+ handleAsyncOperationError(sequentialOperation, "Could not start
the job manager.");
+ }
+ }
+
+ private CompletableFuture<Void> flushPendingOrStartNewProcessAsync(UUID
leaderSessionId) {
+ final JobManagerRunnerResult cachedTerminalResult;
+ synchronized (lock) {
+ if (!isRunning()) {
+ return FutureUtils.completedVoidFuture();
+ }
+ cachedTerminalResult = takePendingTerminalResult();
+ if (cachedTerminalResult != null) {
+ state = State.JOB_COMPLETED;
+ }
+ }
+
+ if (cachedTerminalResult != null) {
+ LOG.info(
+ "Flushing previously observed globally terminal result for
job {} on re-grant; not starting a new {}. Job state: {}.",
+ getJobID(),
+ JobMasterServiceProcess.class.getSimpleName(),
+ cachedTerminalResult
+ .getExecutionGraphInfo()
+ .getArchivedExecutionGraph()
+ .getState());
+ resultFuture.complete(cachedTerminalResult);
+ return FutureUtils.completedVoidFuture();
+ }
+
+ return jobResultStore
+ .hasJobResultEntryAsync(getJobID())
+ .thenCompose(
+ hasJobResult ->
+ hasJobResult
+ ?
handleJobAlreadyDoneIfValidLeader(leaderSessionId)
+ :
createNewJobMasterServiceProcessIfValidLeader(
+ leaderSessionId));
}
@GuardedBy("lock")
- private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
- sequentialOperation =
- sequentialOperation.thenCompose(
- unused ->
- jobResultStore
- .hasJobResultEntryAsync(getJobID())
- .thenCompose(
- hasJobResult -> {
- if (hasJobResult) {
- return
handleJobAlreadyDoneIfValidLeader(
-
leaderSessionId);
- } else {
- return
createNewJobMasterServiceProcessIfValidLeader(
-
leaderSessionId);
- }
- }));
- handleAsyncOperationError(sequentialOperation, "Could not start the
job manager.");
+ private JobManagerRunnerResult takePendingTerminalResult() {
+ final JobManagerRunnerResult terminalResult = pendingTerminalResult;
+ pendingTerminalResult = null;
+ if (terminalResult != null) {
+ currentJobMasterServiceProcessLeaderId = null;
Review Comment:
is resetting `currentJobMasterServiceProcessLeaderId` actually necessary or
just added for consistency purposes? 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]