tillrohrmann commented on a change in pull request #15577:
URL: https://github.com/apache/flink/pull/15577#discussion_r613250217
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -180,26 +173,36 @@ private void handleInitializationFailure(Throwable
initializationFailure) {
*/
public CompletableFuture<Acknowledge> cancel(Time timeout) {
synchronized (lock) {
- if (isInitialized()) {
+ if (jobStatus.isJobManagerCreatedOrFailed()) {
return getJobMasterGateway()
.thenCompose(jobMasterGateway ->
jobMasterGateway.cancel(timeout));
} else {
log.info(
"Cancellation during initialization requested for job
{}. Job will be cancelled once JobManager has been initialized.",
jobId);
-
+ jobStatus.setCancelling();
// cancel job
CompletableFuture<Acknowledge> cancelFuture =
- jobManagerRunnerFuture
-
.thenCompose(JobManagerRunner::getJobMasterGateway)
+ getJobMasterGateway()
.thenCompose(jobMasterGateway ->
jobMasterGateway.cancel(timeout));
cancelFuture.whenComplete(
(ignored, cancelThrowable) -> {
- if (cancelThrowable != null) {
- log.warn("Cancellation of job {} failed",
jobId, cancelThrowable);
+ synchronized (lock) {
+ if (terminationFuture != null) {
+ // This DispatcherJob is pending a
termination. Forward
+ // cancellation result.
+ FutureUtils.forward(
+ cancelFuture.thenCompose((ign) ->
null),
Review comment:
I think this is not working and produces a NPE. Did you want to use
`cancelFuture.thenAccept()`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -299,14 +310,16 @@ public void grantLeadership(final UUID leaderSessionID) {
}
leadershipOperation =
- leadershipOperation.thenRun(
+ leadershipOperation.thenRunAsync(
ThrowingRunnable.unchecked(
() -> {
synchronized (lock) {
verifyJobSchedulingStatusAndStartJobManager(
leaderSessionID);
}
- }));
+ }),
+ executor); // run in separate thread to not block
main thread on
+ // JobManager initialization.
Review comment:
How does it happen that the main thread executor runs this part of the
code?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -211,6 +213,36 @@ public void testDispatcherJobResult() throws Exception {
containsString("Artificial failure"));
}
+ @Test
+ public void testJobNotFinishedException() {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ testContext.setRunning();
+
+ testContext.abortJob();
+
+ try {
+ dispatcherJob.getResultFuture().get();
+ } catch (Throwable t) {
+ assertThat(t, containsCause(JobNotFinishedException.class));
+ }
+ }
+
+ @Test
+ public void testLeadershipLoss() throws Exception {
Review comment:
It would be good to add what should happen under leadership loss.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -43,110 +44,102 @@
import java.util.concurrent.CompletableFuture;
/** Abstraction used by the {@link Dispatcher} to manage jobs. */
-public final class DispatcherJob implements AutoCloseableAsync {
+public final class DispatcherJob implements AutoCloseableAsync,
JobManagerStatusListener {
- private final Logger log = LoggerFactory.getLogger(DispatcherJob.class);
+ private static final Logger log =
LoggerFactory.getLogger(DispatcherJob.class);
- private final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture;
private final CompletableFuture<DispatcherJobResult> jobResultFuture;
- private final CompletableFuture<Void> terminationFuture = new
CompletableFuture<>();
private final long initializationTimestamp;
+
private final JobID jobId;
+
private final String jobName;
+ // We need to guard access to the status field using this lock, because
the methods implemented
+ // by the JobManagerStatusListener might get called from any thread.
private final Object lock = new Object();
- // internal field to track job status during initialization. Is not
updated anymore after
- // job is initialized, cancelled or failed.
+ // if the termination future is set, we are signaling that this
DispatcherJob is closing / has
+ // been closed
Review comment:
This is not the embodiment of an expressive and clear contract.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherJobTest.java
##########
@@ -211,6 +213,36 @@ public void testDispatcherJobResult() throws Exception {
containsString("Artificial failure"));
}
+ @Test
+ public void testJobNotFinishedException() {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ testContext.setRunning();
+
+ testContext.abortJob();
+
+ try {
+ dispatcherJob.getResultFuture().get();
+ } catch (Throwable t) {
+ assertThat(t, containsCause(JobNotFinishedException.class));
+ }
+ }
+
+ @Test
+ public void testLeadershipLoss() throws Exception {
+ TestContext testContext = createTestContext();
+ DispatcherJob dispatcherJob = testContext.getDispatcherJob();
+
+ testContext.setRunning();
+
+ dispatcherJob.onJobManagerStopped();
+
+ assertThat(dispatcherJob.requestJobStatus(TIMEOUT).get(),
is(JobStatus.INITIALIZING));
+
+ testContext.setRunning();
Review comment:
Why is this necessary?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobStatus.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This class tracks the JobStatus for {@link DispatcherJob} with the
JobManagerRunner future. It is
+ * not thread safe.
+ */
+public class DispatcherJobStatus {
+ private Status status;
+
+ private CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = new
CompletableFuture<>();
+
+ public DispatcherJobStatus() {
+ status = Status.INITIALIZING;
+ }
+
+ public void setJobManagerCreated(JobManagerRunner runner) {
+ Preconditions.checkState(
+ status != Status.JOB_MANAGER_CREATED_OR_INIT_FAILED,
+ "JobManager has been created already");
+ this.jobManagerRunnerFuture.complete(runner);
+ status = Status.JOB_MANAGER_CREATED_OR_INIT_FAILED;
+ }
+
+ public void setJobManagerCreationFailed(Throwable failureCause) {
+ Preconditions.checkState(
+ status != Status.JOB_MANAGER_CREATED_OR_INIT_FAILED,
+ "JobManager has been created already");
+ this.jobManagerRunnerFuture.completeExceptionally(failureCause);
+ status = Status.JOB_MANAGER_CREATED_OR_INIT_FAILED;
+ }
+
+ public void setCancelling() {
+ Preconditions.checkState(status == Status.INITIALIZING, "JobManager
must be initializing");
+ status = Status.CANCELLING;
+ }
+
+ public void setInitializing() {
+ Preconditions.checkState(
+ status == Status.JOB_MANAGER_CREATED_OR_INIT_FAILED || status
== Status.CANCELLING,
+ "JobManager must be in state created or cancelling to go back
to initializing");
+ jobManagerRunnerFuture.completeExceptionally(
+ new FlinkException("Initializing new JobManager."));
+ jobManagerRunnerFuture = new CompletableFuture<>();
+ status = Status.INITIALIZING;
+ }
+
+ public JobStatus asJobStatus() {
+ if (status.getJobStatus() == null) {
+ throw new IllegalStateException("This state is not defined as a
'JobStatus'");
+ }
+ return status.getJobStatus();
+ }
+
+ public boolean isInitializing() {
+ return status == Status.INITIALIZING;
+ }
+
+ public boolean isJobManagerCreatedOrFailed() {
+ return status == Status.JOB_MANAGER_CREATED_OR_INIT_FAILED;
+ }
+
+ public boolean isCancelling() {
+ return status == Status.CANCELLING;
+ }
+
+ public CompletableFuture<JobManagerRunner> getJobManagerRunnerFuture() {
+ Preconditions.checkState(
+ !isInitializing(), "JobManagerRunner is not available during
initialization");
+ return jobManagerRunnerFuture;
+ }
+
+ private enum Status {
+ // We are waiting for the JobManager to be created
+ INITIALIZING(JobStatus.INITIALIZING),
+ JOB_MANAGER_CREATED_OR_INIT_FAILED(null),
+ // waiting for cancellation
+ CANCELLING(JobStatus.CANCELLING);
+
+ @Nullable private final JobStatus jobStatus;
+
+ Status(JobStatus jobStatus) {
Review comment:
`@Nullable` is missing.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -394,20 +394,33 @@ private void persistAndRunJob(JobGraph jobGraph) throws
Exception {
}
private void runJob(JobGraph jobGraph, ExecutionType executionType) {
-
Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
+ final JobID jobId = jobGraph.getJobID();
+ Preconditions.checkState(!runningJobs.containsKey(jobId));
long initializationTimestamp = System.currentTimeMillis();
- CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
- createJobManagerRunner(jobGraph, initializationTimestamp);
DispatcherJob dispatcherJob =
- DispatcherJob.createFor(
- jobManagerRunnerFuture,
- jobGraph.getJobID(),
- jobGraph.getName(),
- initializationTimestamp);
- runningJobs.put(jobGraph.getJobID(), dispatcherJob);
+ DispatcherJob.createFor(jobId, jobGraph.getName(),
initializationTimestamp);
+ runningJobs.put(jobId, dispatcherJob);
- final JobID jobId = jobGraph.getJobID();
+ try {
+ JobManagerRunner runner =
+ jobManagerRunnerFactory.createJobManagerRunner(
+ jobGraph,
+ configuration,
+ getRpcService(),
+ highAvailabilityServices,
+ heartbeatServices,
+ jobManagerSharedServices,
+ new
DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
+ fatalErrorHandler,
+ initializationTimestamp,
+ dispatcherJob,
+ getMainThreadExecutor());
+ runner.start();
+ } catch (Exception jobManagerRunnerException) {
+ dispatcherJob.onJobManagerInitializationFailed(
+ new JobExecutionException(jobId,
jobManagerRunnerException));
+ }
Review comment:
I don't understand this control flow. Can't we simply fail (by letting
the exception bubble up) if the synchronous generation of the
`JobManagerRunner` fails?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -394,20 +394,33 @@ private void persistAndRunJob(JobGraph jobGraph) throws
Exception {
}
private void runJob(JobGraph jobGraph, ExecutionType executionType) {
-
Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
+ final JobID jobId = jobGraph.getJobID();
+ Preconditions.checkState(!runningJobs.containsKey(jobId));
long initializationTimestamp = System.currentTimeMillis();
- CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
- createJobManagerRunner(jobGraph, initializationTimestamp);
DispatcherJob dispatcherJob =
- DispatcherJob.createFor(
- jobManagerRunnerFuture,
- jobGraph.getJobID(),
- jobGraph.getName(),
- initializationTimestamp);
- runningJobs.put(jobGraph.getJobID(), dispatcherJob);
+ DispatcherJob.createFor(jobId, jobGraph.getName(),
initializationTimestamp);
+ runningJobs.put(jobId, dispatcherJob);
Review comment:
It feels odd that we first create the `DispatcherJob` and then tell him
about the `JobManagerRunner` through some callback. Why not first creating the
`JobManagerRunner` and then giving it to the `DispatcherJob`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -367,27 +381,39 @@ private void startJobMasterServiceSafely(UUID
leaderSessionId) {
userCodeClassLoader,
initializationTimestamp);
+ // Execute listener notification asynchronously in the main thread
executor to make sure
+ // this "grant leadership" operation is properly completed before
the next operation is
+ // started. With the current implementation DispatcherJob might
call closeAsync()
+ // leading to concurrent access under the "lock".
+ FutureUtils.assertNoException(
+ CompletableFuture.runAsync(
+ () ->
jobManagerStatusListener.onJobManagerStarted(this),
+ mainThreadExecutor));
Review comment:
I think there is design problem here. Somehow the `JobManagerRunner`
needs to know that some calls have to be executed by the `mainThreadExecutor`.
This looks like implementation details which are leaked from the
`DispatcherJob` into the `JobManagerRunnerImpl`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJob.java
##########
@@ -212,20 +215,23 @@ private void handleInitializationFailure(Throwable
initializationFailure) {
executionGraphInfo.getArchivedExecutionGraph().getState());
}
+ boolean isJobMasterGatewayAvailable() {
+ synchronized (lock) {
+ return jobStatus.isJobManagerCreatedOrFailed();
+ }
+ }
+
/** Returns a future completing to the ExecutionGraphInfo of the job. */
public CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout) {
Review comment:
I think this PR effectively suffers from the same problems as the
existing code. What happens if the client polling happens exactly after
`onJobManagerStarted` has been called. In this case, we will call
`getJobMasterGateway().thenCompose(gateway -> gateway.requestJob(timeout))`. If
the leader loses leadership before the call can be executed (note that the
leadership operation of the `JobManagerRunner` runs in a separate thread), then
it can happen that this call will never be answered or that you get a fresh
`leaderGatewayFuture` which in the future might be completed exceptionally if,
for example, the `JobMasterService` cannot be instantiated.
I think the underlying problem is that you try to maintain a consistent view
of the `JobManagerRunner` within the `DispatcherJob` but the former runs
independent of the latter.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherJobStatus.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This class tracks the JobStatus for {@link DispatcherJob} with the
JobManagerRunner future. It is
+ * not thread safe.
+ */
+public class DispatcherJobStatus {
+ private Status status;
+
+ private CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = new
CompletableFuture<>();
Review comment:
Why does this need to be a future? If we transition into the
`Status.JOB_MANAGER_CREATED_OR_INIT_FAILED`, then the `JobManagerRunner` should
be known.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -350,9 +366,8 @@ public void testJobSubmission() throws Exception {
jobMasterLeaderElectionService.getStartFuture().get();
- assertTrue(
- "jobManagerRunner was not started",
- jobMasterLeaderElectionService.getStartFuture().isDone());
+ // complete JobManager initialization by granting leadership
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
Review comment:
Why is this necessary? How is the test asserting that a job submission
creates a new `JobManagerRunner`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]