tillrohrmann commented on a change in pull request #15577:
URL: https://github.com/apache/flink/pull/15577#discussion_r615761203
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
##########
@@ -55,4 +60,14 @@
* @return job id of the executed job
*/
JobID getJobID();
+
+ CompletableFuture<Acknowledge> cancel(Time timeout);
+
+ CompletableFuture<JobStatus> requestJobStatus(Time timeout);
+
+ CompletableFuture<JobDetails> requestJobDetails(Time timeout);
+
+ CompletableFuture<ExecutionGraphInfo> requestJob(Time timeout);
+
+ boolean isInitialized();
Review comment:
JavaDocs would be nice.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -297,41 +399,54 @@ public void grantLeadership(final UUID leaderSessionID) {
"JobManagerRunner cannot be granted leadership because
it is already shut down.");
return;
}
+ this.currentLeaderSession = leaderSessionID;
+ // enqueue a leadership operation
leadershipOperation =
- leadershipOperation.thenRun(
- ThrowingRunnable.unchecked(
- () -> {
- synchronized (lock) {
-
verifyJobSchedulingStatusAndStartJobManager(
- leaderSessionID);
- }
- }));
+ leadershipOperation.thenCompose(
+ (ign) -> {
+ synchronized (lock) {
+ if (currentLeaderSession == null
+ ||
!currentLeaderSession.equals(leaderSessionID)) {
+ // lost leadership in the meantime,
complete this operation
+ return
CompletableFuture.completedFuture(null);
+ }
Review comment:
Nice :-)
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -194,44 +207,133 @@ public void start() throws Exception {
shutdown = true;
setNewLeaderGatewayFuture();
- leaderGatewayFuture.completeExceptionally(
- new FlinkException("JobMaster has been shut down."));
-
- final CompletableFuture<Void> jobManagerTerminationFuture;
+ final FlinkException shutdownException =
+ new FlinkException("JobMaster has been shut down.");
+ leaderGatewayFuture.completeExceptionally(shutdownException);
+
+ if (jobStatus ==
JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED) {
+ checkState(
+ jobMasterService != null,
+ "JobMaster service must be set when Job master is
initialized");
+ FutureUtils.assertNoException(
+ jobMasterService
+ .closeAsync()
+ .whenComplete(
+ (Void ignored, Throwable
throwable) ->
+
onJobManagerTermination(throwable)));
+ } else if (jobStatus.isInitializing()) {
+ if (jobStatus ==
JobManagerRunnerJobStatus.INITIALIZING_CANCELLING) {
+ checkState(cancelFuture != null);
+ cancelFuture.completeExceptionally(shutdownException);
+ }
- if (jobMasterService == null) {
- jobManagerTerminationFuture =
FutureUtils.completedVoidFuture();
- } else {
- jobManagerTerminationFuture =
jobMasterService.closeAsync();
+ if (currentLeaderSession == null) {
+ // no ongoing JobMaster initialization (waiting for
leadership) --> close
+ onJobManagerTermination(null);
+ }
+ // ongoing initialization, we will finish closing once it
is done.
Review comment:
Just looking at this method, it is not very clear to me how the
`terminationFuture` is completed in case of an ongoing initialization.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -194,44 +207,133 @@ public void start() throws Exception {
shutdown = true;
setNewLeaderGatewayFuture();
- leaderGatewayFuture.completeExceptionally(
- new FlinkException("JobMaster has been shut down."));
-
- final CompletableFuture<Void> jobManagerTerminationFuture;
+ final FlinkException shutdownException =
+ new FlinkException("JobMaster has been shut down.");
+ leaderGatewayFuture.completeExceptionally(shutdownException);
+
+ if (jobStatus ==
JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED) {
+ checkState(
+ jobMasterService != null,
+ "JobMaster service must be set when Job master is
initialized");
+ FutureUtils.assertNoException(
+ jobMasterService
+ .closeAsync()
+ .whenComplete(
+ (Void ignored, Throwable
throwable) ->
+
onJobManagerTermination(throwable)));
+ } else if (jobStatus.isInitializing()) {
+ if (jobStatus ==
JobManagerRunnerJobStatus.INITIALIZING_CANCELLING) {
+ checkState(cancelFuture != null);
+ cancelFuture.completeExceptionally(shutdownException);
+ }
- if (jobMasterService == null) {
- jobManagerTerminationFuture =
FutureUtils.completedVoidFuture();
- } else {
- jobManagerTerminationFuture =
jobMasterService.closeAsync();
+ if (currentLeaderSession == null) {
+ // no ongoing JobMaster initialization (waiting for
leadership) --> close
+ onJobManagerTermination(null);
+ }
+ // ongoing initialization, we will finish closing once it
is done.
}
+ }
- jobManagerTerminationFuture.whenComplete(
- (Void ignored, Throwable throwable) -> {
- try {
- leaderElectionService.stop();
- } catch (Throwable t) {
- throwable =
- ExceptionUtils.firstOrSuppressed(
- t,
-
ExceptionUtils.stripCompletionException(throwable));
- }
+ return terminationFuture;
Review comment:
Nit: I think it would be better if the completion of the
`terminationFuture` happens in this method. E.g. via using
`FutureUtils.forward(shutdownFuture, terminationFuture)`. That way one does not
have to look around where the termination future is actually completed.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -194,44 +207,133 @@ public void start() throws Exception {
shutdown = true;
setNewLeaderGatewayFuture();
- leaderGatewayFuture.completeExceptionally(
- new FlinkException("JobMaster has been shut down."));
-
- final CompletableFuture<Void> jobManagerTerminationFuture;
+ final FlinkException shutdownException =
+ new FlinkException("JobMaster has been shut down.");
+ leaderGatewayFuture.completeExceptionally(shutdownException);
+
+ if (jobStatus ==
JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED) {
+ checkState(
+ jobMasterService != null,
+ "JobMaster service must be set when Job master is
initialized");
+ FutureUtils.assertNoException(
+ jobMasterService
+ .closeAsync()
+ .whenComplete(
+ (Void ignored, Throwable
throwable) ->
+
onJobManagerTermination(throwable)));
Review comment:
For chaining of shutdown operations there are some utilities like
`FutureUtils.runAfterwards` or so. This makes the handling of exception
occurring in the individual steps easier because they are automatically added
as suppressed exceptions.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -194,44 +207,133 @@ public void start() throws Exception {
shutdown = true;
setNewLeaderGatewayFuture();
- leaderGatewayFuture.completeExceptionally(
- new FlinkException("JobMaster has been shut down."));
-
- final CompletableFuture<Void> jobManagerTerminationFuture;
+ final FlinkException shutdownException =
+ new FlinkException("JobMaster has been shut down.");
+ leaderGatewayFuture.completeExceptionally(shutdownException);
+
+ if (jobStatus ==
JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED) {
+ checkState(
+ jobMasterService != null,
+ "JobMaster service must be set when Job master is
initialized");
+ FutureUtils.assertNoException(
+ jobMasterService
+ .closeAsync()
+ .whenComplete(
+ (Void ignored, Throwable
throwable) ->
+
onJobManagerTermination(throwable)));
+ } else if (jobStatus.isInitializing()) {
+ if (jobStatus ==
JobManagerRunnerJobStatus.INITIALIZING_CANCELLING) {
+ checkState(cancelFuture != null);
+ cancelFuture.completeExceptionally(shutdownException);
Review comment:
Why don't we have to close the `leaderElectionService` in case of a
cancellation? Same for the `resultFuture`. What is the semantics for the
`resultFuture` when the `JobManagerRunner` is cancelled?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -348,17 +463,76 @@ private void startJobMaster(UUID leaderSessionId) throws
FlinkException {
e);
}
- startJobMasterServiceSafely(leaderSessionId);
+ // run blocking JobMaster initialization outside of the lock
+ CompletableFuture<JobMasterService> jobMasterStartFuture =
+ CompletableFuture.supplyAsync(
+ () -> startJobMasterServiceSafely(leaderSessionId),
executor);
+
+ return jobMasterStartFuture.thenCompose(
+ (newJobMasterService) -> {
+ synchronized (lock) {
+ if (newJobMasterService == null) {
Review comment:
I would suggest to encode the initialization failure more explicitly and
not only via a `null` value.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java
##########
@@ -290,9 +302,286 @@ public void
testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializat
assertTrue(
jobManagerRunnerResult.getInitializationFailure()
instanceof JobInitializationException);
+ assertThat(jobManagerRunnerResult.getInitializationFailure(),
containsCause(testException));
+ }
+
+ @Test
+ public void
testJobMasterShutDownOnRunnerShutdownDuringJobMasterInitialization()
+ throws Exception {
+ final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory =
+ new BlockingJobMasterServiceFactory();
+
+ final JobManagerRunner jobManagerRunner =
+ createJobManagerRunner(blockingJobMasterServiceFactory);
+
+ jobManagerRunner.start();
+
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ TestingJobMasterService testingJobMasterService =
+ blockingJobMasterServiceFactory.waitForBlockingOnInit();
+
+ CompletableFuture<Void> closeFuture = jobManagerRunner.closeAsync();
+
+ blockingJobMasterServiceFactory.unblock();
+
+ closeFuture.get();
+
+ assertJobNotFinished(jobManagerRunner.getResultFuture());
+
+ assertThat(testingJobMasterService.isClosed(), is(true));
+ }
+
+ @Test
+ public void testJobMasterShutdownOnLeadershipLossDuringInitialization()
throws Exception {
+ final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory =
+ new BlockingJobMasterServiceFactory();
+
+ final JobManagerRunner jobManagerRunner =
+ createJobManagerRunner(blockingJobMasterServiceFactory);
+
+ jobManagerRunner.start();
+
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ TestingJobMasterService testingJobMasterService =
+ blockingJobMasterServiceFactory.waitForBlockingOnInit();
+
+ leaderElectionService.notLeader();
+
+ blockingJobMasterServiceFactory.unblock();
+
+ // assert termination of testingJobMaster
+ testingJobMasterService.getTerminationFuture().get();
+ assertThat(testingJobMasterService.isClosed(), is(true));
+ }
+
+ @Test
+ public void testJobCancellationOnCancellationDuringInitialization() throws
Exception {
+ AtomicBoolean cancelCalled = new AtomicBoolean(false);
+ JobMasterGateway jobMasterGateway =
+ new TestingJobMasterGatewayBuilder()
+ .setCancelFunction(
+ () -> {
+ cancelCalled.set(true);
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .build();
+
+ TestingJobMasterService testingJobMasterService =
+ new TestingJobMasterService(jobMasterGateway);
+ final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory =
+ new BlockingJobMasterServiceFactory(() ->
testingJobMasterService);
+
+ final JobManagerRunner jobManagerRunner =
+ createJobManagerRunner(blockingJobMasterServiceFactory);
+
+ jobManagerRunner.start();
+
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ blockingJobMasterServiceFactory.waitForBlockingOnInit();
+
+ // cancel during init
+ CompletableFuture<Acknowledge> cancellationFuture =
+ jobManagerRunner.cancel(TESTING_TIMEOUT);
+
+ assertThat(cancellationFuture.isDone(), is(false));
+
+ blockingJobMasterServiceFactory.unblock();
+
+ // assert that cancellation future completes when cancellation
completes.
+ cancellationFuture.get();
+ assertThat(cancelCalled.get(), is(true));
+ }
+
+ @Test
+ public void testJobInformationOperationsDuringInitialization() throws
Exception {
+ final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory =
+ new BlockingJobMasterServiceFactory();
+
+ final JobManagerRunner jobManagerRunner =
+ createJobManagerRunner(blockingJobMasterServiceFactory);
+
+ jobManagerRunner.start();
+
+ // assert initializing while waiting for leadership
+ assertInitializingStates(jobManagerRunner);
+
+ // assign leadership
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ // assert initializing while JobMaster is blocked
+ assertInitializingStates(jobManagerRunner);
+ blockingJobMasterServiceFactory.unblock();
+ }
+
+ private static void assertInitializingStates(JobManagerRunner
jobManagerRunner)
+ throws ExecutionException, InterruptedException {
+ assertThat(
+ jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(),
+ is(JobStatus.INITIALIZING));
+ assertThat(jobManagerRunner.getResultFuture().isDone(), is(false));
assertThat(
- jobManagerRunnerResult.getInitializationFailure(),
- FlinkMatchers.containsCause(testException));
+ jobManagerRunner
+ .requestJob(TESTING_TIMEOUT)
+ .get()
+ .getArchivedExecutionGraph()
+ .getState(),
+ is(JobStatus.INITIALIZING));
+
+ assertThat(
+
jobManagerRunner.requestJobDetails(TESTING_TIMEOUT).get().getStatus(),
+ is(JobStatus.INITIALIZING));
+ }
+
+ @Test
+ public void testShutdownInInitializedState() throws Exception {
+ final JobManagerRunnerImpl jobManagerRunner = createJobManagerRunner();
+ jobManagerRunner.start();
+ // grant leadership to finish initialization
+ leaderElectionService.isLeader(UUID.randomUUID()).get();
+
+ assertThat(jobManagerRunner.isInitialized(), is(true));
+
+ jobManagerRunner.close();
+
+ assertJobNotFinished(jobManagerRunner.getResultFuture());
+ }
+
+ @Test
+ public void testShutdownWhileWaitingForCancellationDuringInitialization()
throws Exception {
+ final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory =
+ new BlockingJobMasterServiceFactory();
+
+ final JobManagerRunner jobManagerRunner =
+ createJobManagerRunner(blockingJobMasterServiceFactory);
+
+ jobManagerRunner.start();
+
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ blockingJobMasterServiceFactory.waitForBlockingOnInit();
+
+ // cancel while initializing
+ assertThat(
+ jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(),
+ is(JobStatus.INITIALIZING));
+
+ CompletableFuture<Acknowledge> cancelFuture =
jobManagerRunner.cancel(TESTING_TIMEOUT);
+ assertThat(cancelFuture.isDone(), is(false));
+
+ CompletableFuture<Void> closeFuture = jobManagerRunner.closeAsync();
+ assertThat(closeFuture.isDone(), is(false));
+
+ // the close operation finishes only once the initialization finishes
+ blockingJobMasterServiceFactory.unblock();
+
+ assertThat(cancelFuture.isCompletedExceptionally(), is(true));
+ assertJobNotFinished(jobManagerRunner.getResultFuture());
+ }
+
+ @Test
+ public void testCancellationAfterInitialization() throws Exception {
+ AtomicBoolean cancelCalled = new AtomicBoolean(false);
+ JobMasterGateway testingGateway =
+ new TestingJobMasterGatewayBuilder()
+ .setCancelFunction(
+ () -> {
+ cancelCalled.set(true);
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .build();
+ TestingJobMasterServiceFactory jobMasterServiceFactory =
+ new TestingJobMasterServiceFactory(
+ () -> new TestingJobMasterService(testingGateway));
+ final JobManagerRunnerImpl jobManagerRunner =
+ createJobManagerRunner(jobMasterServiceFactory);
+ jobManagerRunner.start();
+ // grant leadership to finish initialization
+ leaderElectionService.isLeader(UUID.randomUUID()).get();
+
+ assertThat(jobManagerRunner.isInitialized(), is(true));
+
+ jobManagerRunner.cancel(TESTING_TIMEOUT).get();
+ assertThat(cancelCalled.get(), is(true));
+ }
+
+ // It can happen that a series of leadership operations happens while the
JobMaster
+ // initialization is blocked. This test is to ensure that we are not
starting-stopping
+ // JobMasters for all pending leadership grants, but only for the latest.
+ @Test
+ public void testSkippingOfEnqueuedLeadershipOperations() throws Exception {
+ final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory =
+ new BlockingJobMasterServiceFactory();
+
+ final JobManagerRunner jobManagerRunner =
+ createJobManagerRunner(blockingJobMasterServiceFactory);
+
+ jobManagerRunner.start();
+
+ // first leadership assignment to get into blocking initialization
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ blockingJobMasterServiceFactory.waitForBlockingOnInit();
+
+ // we are now blocked on the initialization, enqueue some operations:
+ for (int i = 0; i < 10; i++) {
+ leaderElectionService.notLeader();
+ leaderElectionService.isLeader(UUID.randomUUID());
+ }
+
+ blockingJobMasterServiceFactory.unblock();
+
+ // wait until the second JobMaster has been created
+ blockingJobMasterServiceFactory.waitForBlockingOnInit();
+
+ assertThat(
+
blockingJobMasterServiceFactory.getNumberOfJobMasterInstancesCreated(),
equalTo(2));
+ }
+
+ @Test
+ public void testCancellationFailsWhenInitializationFails() throws
Exception {
Review comment:
Is this the behaviour we want to have? Alternatively, one could argue
that an initialization failure is a kind of short cut for shutting the
`JobMasterService` down.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -297,41 +399,54 @@ public void grantLeadership(final UUID leaderSessionID) {
"JobManagerRunner cannot be granted leadership because
it is already shut down.");
return;
}
+ this.currentLeaderSession = leaderSessionID;
+ // enqueue a leadership operation
leadershipOperation =
- leadershipOperation.thenRun(
- ThrowingRunnable.unchecked(
- () -> {
- synchronized (lock) {
-
verifyJobSchedulingStatusAndStartJobManager(
- leaderSessionID);
- }
- }));
+ leadershipOperation.thenCompose(
+ (ign) -> {
+ synchronized (lock) {
+ if (currentLeaderSession == null
+ ||
!currentLeaderSession.equals(leaderSessionID)) {
+ // lost leadership in the meantime,
complete this operation
+ return
CompletableFuture.completedFuture(null);
+ }
+ try {
+ return
verifyJobSchedulingStatusAndStartJobManager(
+ leaderSessionID);
+ } catch (FlinkException e) {
+ ExceptionUtils.rethrow(e);
+ }
+ }
+ return null;
Review comment:
I would suggest to move this return into the `catch` block. This makes
it a bit clearer that this is only relevant for this case.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -348,17 +463,76 @@ private void startJobMaster(UUID leaderSessionId) throws
FlinkException {
e);
}
- startJobMasterServiceSafely(leaderSessionId);
+ // run blocking JobMaster initialization outside of the lock
+ CompletableFuture<JobMasterService> jobMasterStartFuture =
+ CompletableFuture.supplyAsync(
+ () -> startJobMasterServiceSafely(leaderSessionId),
executor);
+
+ return jobMasterStartFuture.thenCompose(
+ (newJobMasterService) -> {
+ synchronized (lock) {
+ if (newJobMasterService == null) {
Review comment:
How do we record the exact failure cause?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -348,17 +462,76 @@ private void startJobMaster(UUID leaderSessionId) throws
FlinkException {
e);
}
Review comment:
Wouldn't this also qualify as an initialization failure if we cannot set
the correct state in the `runningJobsRegistry`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -348,17 +462,76 @@ private void startJobMaster(UUID leaderSessionId) throws
FlinkException {
e);
}
- startJobMasterServiceSafely(leaderSessionId);
+ // run blocking JobMaster initialization outside of the lock
+ CompletableFuture<JobMasterService> jobMasterStartFuture =
+ CompletableFuture.supplyAsync(
+ () -> startJobMasterServiceSafely(leaderSessionId),
executor);
+
+ return jobMasterStartFuture.thenCompose(
+ (newJobMasterService) -> {
+ synchronized (lock) {
+ if (newJobMasterService == null) {
+ // initialization failed
+ if (jobStatus ==
JobManagerRunnerJobStatus.INITIALIZING_CANCELLING) {
+ checkState(cancelFuture != null);
+ cancelFuture.completeExceptionally(
+ new FlinkException(
+ "Cancellation failed because
JobMaster initialization failed"));
+ }
+ jobStatus =
JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED;
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return onJobMasterInitializationCompletion(
+ newJobMasterService, leaderSessionId);
+ }
+ }
+ });
+ }
- if (jobMasterService != null) {
- confirmLeaderSessionIdIfStillLeader(jobMasterService,
leaderSessionId);
+ // JobMaster initialization is completed. Ensure proper state and
leadership
+ @GuardedBy("lock")
+ private CompletableFuture<Void> onJobMasterInitializationCompletion(
+ JobMasterService newJobMasterService, UUID leaderSessionId) {
+ checkNotNull(newJobMasterService);
+ if (shutdown) {
+ return newJobMasterService
+ .closeAsync()
+ .whenComplete(
+ (Void ignored, Throwable throwable) ->
+ onJobManagerTermination(throwable));
}
- }
- private void startJobMasterServiceSafely(UUID leaderSessionId) {
- checkState(jobMasterService == null, "JobMasterService must be null
before being started.");
+ checkState(
+ jobStatus.isInitializing(),
+ "Can only complete initialization in initializing state");
+
+ if (leaderElectionService.hasLeadership(leaderSessionId)) {
+ jobMasterService = newJobMasterService;
+ leaderGatewayFuture.complete(jobMasterService.getGateway());
+ leaderElectionService.confirmLeadership(leaderSessionId,
jobMasterService.getAddress());
+
+ if (jobStatus ==
JobManagerRunnerJobStatus.INITIALIZING_CANCELLING) {
+ checkState(cancelFuture != null);
+ FutureUtils.forward(
+
newJobMasterService.getGateway().cancel(cancelTimeout), cancelFuture);
+ }
+
+ jobStatus = JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED;
+ } else {
+ log.info(
+ "Ignoring confirmation of leader session id because {} is
no longer the leader. Shutting down JobMaster",
+ getDescription());
+ return newJobMasterService.closeAsync();
+ }
+
+ return CompletableFuture.completedFuture(null);
+ }
+ @Nullable
+ private JobMasterService startJobMasterServiceSafely(UUID leaderSessionId)
{
try {
+ // We assume this operation to be potentially long-running (thus
it can not block the
Review comment:
Maybe: (thus it should not be executed by the JobManager's main thread)
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -294,17 +293,18 @@ public void testBlobServerCleanupWhenJobNotFinished()
throws Exception {
@Test
public void testBlobServerCleanupWhenJobSubmissionFails() throws Exception
{
startDispatcher(new FailingJobManagerRunnerFactory(new
FlinkException("Test exception")));
- dispatcherGateway.submitJob(jobGraph, timeout).get();
-
- Optional<SerializedThrowable> maybeError =
- dispatcherGateway.requestJobResult(jobId,
timeout).get().getSerializedThrowable();
+ final CompletableFuture<Acknowledge> submissionFuture =
+ dispatcherGateway.submitJob(jobGraph, timeout);
- assertThat(maybeError.isPresent(), is(true));
- Throwable exception =
maybeError.get().deserializeError(this.getClass().getClassLoader());
+ try {
+ submissionFuture.get();
+ fail("Job submission was expected to fail.");
+ } catch (ExecutionException ee) {
+ assertThat(
+ ExceptionUtils.findThrowable(ee,
JobSubmissionException.class).isPresent(),
+ is(true));
Review comment:
I'd suggest to use `FlinkMatchers.containsCause`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java
##########
@@ -328,4 +617,68 @@ private JobManagerRunnerImpl createJobManagerRunner(
fatalErrorHandler,
System.currentTimeMillis());
}
+
+ public static class BlockingJobMasterServiceFactory implements
JobMasterServiceFactory {
+
+ private final OneShotLatch blocker = new OneShotLatch();
+ private final BlockingQueue<TestingJobMasterService>
jobMasterServicesQueue =
+ new ArrayBlockingQueue(1);
+ private final Supplier<TestingJobMasterService>
testingJobMasterServiceSupplier;
+ private int numberOfJobMasterInstancesCreated = 0;
+ private FlinkException initializationException = null;
+
+ public BlockingJobMasterServiceFactory() {
+ this((JobMasterGateway) null);
+ }
+
+ public BlockingJobMasterServiceFactory(@Nullable JobMasterGateway
jobMasterGateway) {
+ this(() -> new TestingJobMasterService(null, null,
jobMasterGateway));
+ }
+
+ public BlockingJobMasterServiceFactory(
+ Supplier<TestingJobMasterService>
testingJobMasterServiceSupplier) {
+ this.testingJobMasterServiceSupplier =
testingJobMasterServiceSupplier;
+ }
+
+ @Override
+ public JobMasterService createJobMasterService(
+ JobGraph jobGraph,
+ JobMasterId jobMasterId,
+ OnCompletionActions jobCompletionActions,
+ ClassLoader userCodeClassloader,
+ long initializationTimestamp)
+ throws Exception {
+ TestingJobMasterService service =
testingJobMasterServiceSupplier.get();
+ jobMasterServicesQueue.offer(service);
+
+ blocker.await();
+ if (initializationException != null) {
+ throw initializationException;
+ }
+ numberOfJobMasterInstancesCreated++;
+ return service;
+ }
+
+ public void unblock() {
+ blocker.trigger();
+ }
+
+ public TestingJobMasterService waitForBlockingOnInit()
+ throws ExecutionException, InterruptedException {
+ return jobMasterServicesQueue.take();
+ }
+
+ public int getNumberOfJobMasterInstancesCreated() {
+ return numberOfJobMasterInstancesCreated;
+ }
+
+ public void failBlockingInitialization() {
+ Preconditions.checkState(
+ !blocker.isTriggered(),
+ "This only works before the initialization has been
unblocked");
+ this.initializationException =
Review comment:
If you set things from another thread, then we either need at least a
`volatile` or some other form of synchronization. An alternative to setting
these kind of fields while the instance is being used is to configure it when
creating an instance of this class.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.java
##########
@@ -290,9 +302,286 @@ public void
testJobMasterCreationFailureCompletesJobManagerRunnerWithInitializat
assertTrue(
jobManagerRunnerResult.getInitializationFailure()
instanceof JobInitializationException);
+ assertThat(jobManagerRunnerResult.getInitializationFailure(),
containsCause(testException));
+ }
+
+ @Test
+ public void
testJobMasterShutDownOnRunnerShutdownDuringJobMasterInitialization()
+ throws Exception {
+ final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory =
+ new BlockingJobMasterServiceFactory();
+
+ final JobManagerRunner jobManagerRunner =
+ createJobManagerRunner(blockingJobMasterServiceFactory);
+
+ jobManagerRunner.start();
+
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ TestingJobMasterService testingJobMasterService =
+ blockingJobMasterServiceFactory.waitForBlockingOnInit();
+
+ CompletableFuture<Void> closeFuture = jobManagerRunner.closeAsync();
+
+ blockingJobMasterServiceFactory.unblock();
+
+ closeFuture.get();
+
+ assertJobNotFinished(jobManagerRunner.getResultFuture());
+
+ assertThat(testingJobMasterService.isClosed(), is(true));
+ }
+
+ @Test
+ public void testJobMasterShutdownOnLeadershipLossDuringInitialization()
throws Exception {
+ final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory =
+ new BlockingJobMasterServiceFactory();
+
+ final JobManagerRunner jobManagerRunner =
+ createJobManagerRunner(blockingJobMasterServiceFactory);
+
+ jobManagerRunner.start();
+
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ TestingJobMasterService testingJobMasterService =
+ blockingJobMasterServiceFactory.waitForBlockingOnInit();
+
+ leaderElectionService.notLeader();
+
+ blockingJobMasterServiceFactory.unblock();
+
+ // assert termination of testingJobMaster
+ testingJobMasterService.getTerminationFuture().get();
+ assertThat(testingJobMasterService.isClosed(), is(true));
+ }
+
+ @Test
+ public void testJobCancellationOnCancellationDuringInitialization() throws
Exception {
+ AtomicBoolean cancelCalled = new AtomicBoolean(false);
+ JobMasterGateway jobMasterGateway =
+ new TestingJobMasterGatewayBuilder()
+ .setCancelFunction(
+ () -> {
+ cancelCalled.set(true);
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ })
+ .build();
+
+ TestingJobMasterService testingJobMasterService =
+ new TestingJobMasterService(jobMasterGateway);
+ final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory =
+ new BlockingJobMasterServiceFactory(() ->
testingJobMasterService);
+
+ final JobManagerRunner jobManagerRunner =
+ createJobManagerRunner(blockingJobMasterServiceFactory);
+
+ jobManagerRunner.start();
+
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ blockingJobMasterServiceFactory.waitForBlockingOnInit();
+
+ // cancel during init
+ CompletableFuture<Acknowledge> cancellationFuture =
+ jobManagerRunner.cancel(TESTING_TIMEOUT);
+
+ assertThat(cancellationFuture.isDone(), is(false));
+
+ blockingJobMasterServiceFactory.unblock();
+
+ // assert that cancellation future completes when cancellation
completes.
+ cancellationFuture.get();
+ assertThat(cancelCalled.get(), is(true));
+ }
+
+ @Test
+ public void testJobInformationOperationsDuringInitialization() throws
Exception {
+ final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory =
+ new BlockingJobMasterServiceFactory();
+
+ final JobManagerRunner jobManagerRunner =
+ createJobManagerRunner(blockingJobMasterServiceFactory);
+
+ jobManagerRunner.start();
+
+ // assert initializing while waiting for leadership
+ assertInitializingStates(jobManagerRunner);
+
+ // assign leadership
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ // assert initializing while JobMaster is blocked
+ assertInitializingStates(jobManagerRunner);
+ blockingJobMasterServiceFactory.unblock();
+ }
+
+ private static void assertInitializingStates(JobManagerRunner
jobManagerRunner)
+ throws ExecutionException, InterruptedException {
+ assertThat(
+ jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(),
+ is(JobStatus.INITIALIZING));
+ assertThat(jobManagerRunner.getResultFuture().isDone(), is(false));
assertThat(
- jobManagerRunnerResult.getInitializationFailure(),
- FlinkMatchers.containsCause(testException));
+ jobManagerRunner
+ .requestJob(TESTING_TIMEOUT)
+ .get()
+ .getArchivedExecutionGraph()
+ .getState(),
+ is(JobStatus.INITIALIZING));
+
+ assertThat(
+
jobManagerRunner.requestJobDetails(TESTING_TIMEOUT).get().getStatus(),
+ is(JobStatus.INITIALIZING));
+ }
+
+ @Test
+ public void testShutdownInInitializedState() throws Exception {
+ final JobManagerRunnerImpl jobManagerRunner = createJobManagerRunner();
+ jobManagerRunner.start();
+ // grant leadership to finish initialization
+ leaderElectionService.isLeader(UUID.randomUUID()).get();
+
+ assertThat(jobManagerRunner.isInitialized(), is(true));
+
+ jobManagerRunner.close();
+
+ assertJobNotFinished(jobManagerRunner.getResultFuture());
+ }
+
+ @Test
+ public void testShutdownWhileWaitingForCancellationDuringInitialization()
throws Exception {
+ final BlockingJobMasterServiceFactory blockingJobMasterServiceFactory =
+ new BlockingJobMasterServiceFactory();
+
+ final JobManagerRunner jobManagerRunner =
+ createJobManagerRunner(blockingJobMasterServiceFactory);
+
+ jobManagerRunner.start();
+
+ leaderElectionService.isLeader(UUID.randomUUID());
+
+ blockingJobMasterServiceFactory.waitForBlockingOnInit();
+
+ // cancel while initializing
+ assertThat(
+ jobManagerRunner.requestJobStatus(TESTING_TIMEOUT).get(),
+ is(JobStatus.INITIALIZING));
+
+ CompletableFuture<Acknowledge> cancelFuture =
jobManagerRunner.cancel(TESTING_TIMEOUT);
+ assertThat(cancelFuture.isDone(), is(false));
+
+ CompletableFuture<Void> closeFuture = jobManagerRunner.closeAsync();
+ assertThat(closeFuture.isDone(), is(false));
Review comment:
Why is this necessary?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -408,19 +583,6 @@ private void jobAlreadyDone() {
}
}
- private void confirmLeaderSessionIdIfStillLeader(
- JobMasterService jobMasterService, UUID leaderSessionId) {
-
- if (leaderElectionService.hasLeadership(leaderSessionId)) {
- leaderGatewayFuture.complete(jobMasterService.getGateway());
- leaderElectionService.confirmLeadership(leaderSessionId,
jobMasterService.getAddress());
- } else {
- log.debug(
- "Ignoring confirmation of leader session id because {} is
no longer the leader.",
- getDescription());
- }
- }
-
@Override
public void revokeLeadership() {
Review comment:
My gut feeling is that we need to integrate the leadership calls (grant
+ revoke) into the internal state management of the `JobManagerRunnerImpl`. For
example, after cancelling the `JobManagerRunner` before a leadership operation
has started, it should probably not accept a `grantLeadership` call.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -348,17 +463,76 @@ private void startJobMaster(UUID leaderSessionId) throws
FlinkException {
e);
}
- startJobMasterServiceSafely(leaderSessionId);
+ // run blocking JobMaster initialization outside of the lock
+ CompletableFuture<JobMasterService> jobMasterStartFuture =
+ CompletableFuture.supplyAsync(
+ () -> startJobMasterServiceSafely(leaderSessionId),
executor);
+
+ return jobMasterStartFuture.thenCompose(
+ (newJobMasterService) -> {
+ synchronized (lock) {
+ if (newJobMasterService == null) {
+ // initialization failed
+ if (jobStatus ==
JobManagerRunnerJobStatus.INITIALIZING_CANCELLING) {
+ checkState(cancelFuture != null);
+ cancelFuture.completeExceptionally(
+ new FlinkException(
+ "Cancellation failed because
JobMaster initialization failed"));
+ }
+ jobStatus =
JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED;
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return onJobMasterInitializationCompletion(
+ newJobMasterService, leaderSessionId);
+ }
+ }
+ });
+ }
- if (jobMasterService != null) {
- confirmLeaderSessionIdIfStillLeader(jobMasterService,
leaderSessionId);
+ @GuardedBy("lock")
+ private CompletableFuture<Void> onJobMasterInitializationCompletion(
+ JobMasterService newJobMasterService, UUID leaderSessionId) {
+ // JobMaster initialization is completed. Ensure proper state and
leadership
+ if (shutdown) {
+ return newJobMasterService
+ .closeAsync()
+ .whenComplete(
+ (Void ignored, Throwable throwable) ->
+ onJobManagerTermination(throwable));
}
- }
- private void startJobMasterServiceSafely(UUID leaderSessionId) {
- checkState(jobMasterService == null, "JobMasterService must be null
before being started.");
+ checkState(
+ jobStatus.isInitializing(),
+ "Can only complete initialization in initializing state");
+
+ if (leaderElectionService.hasLeadership(leaderSessionId)) {
+ jobMasterService = newJobMasterService;
+ leaderGatewayFuture.complete(jobMasterService.getGateway());
+ leaderElectionService.confirmLeadership(leaderSessionId,
jobMasterService.getAddress());
+
+ if (jobStatus ==
JobManagerRunnerJobStatus.INITIALIZING_CANCELLING) {
+ checkState(cancelFuture != null);
+ // we have a pending cancellation. TODO forward original
cancel timeout
+ FutureUtils.forward(
+
newJobMasterService.getGateway().cancel(cancelTimeout), cancelFuture);
+ }
+
+ jobStatus = JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED;
+ } else {
+ log.info(
+ "Ignoring confirmation of leader session id because {} is
no longer the leader. Shutting down JobMaster",
+ getDescription());
+ return newJobMasterService.closeAsync();
+ }
+
+ return CompletableFuture.completedFuture(null);
Review comment:
I would suggest to move it into the if branch.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -348,17 +462,76 @@ private void startJobMaster(UUID leaderSessionId) throws
FlinkException {
e);
}
- startJobMasterServiceSafely(leaderSessionId);
+ // run blocking JobMaster initialization outside of the lock
+ CompletableFuture<JobMasterService> jobMasterStartFuture =
+ CompletableFuture.supplyAsync(
+ () -> startJobMasterServiceSafely(leaderSessionId),
executor);
+
+ return jobMasterStartFuture.thenCompose(
+ (newJobMasterService) -> {
+ synchronized (lock) {
+ if (newJobMasterService == null) {
+ // initialization failed
+ if (jobStatus ==
JobManagerRunnerJobStatus.INITIALIZING_CANCELLING) {
+ checkState(cancelFuture != null);
+ cancelFuture.completeExceptionally(
+ new FlinkException(
+ "Cancellation failed because
JobMaster initialization failed"));
+ }
+ jobStatus =
JobManagerRunnerJobStatus.JOBMASTER_INITIALIZED;
Review comment:
Why is the `JOBMASTER_INITIALIZED` if its creation failed?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -101,6 +105,15 @@
private volatile CompletableFuture<JobMasterGateway> leaderGatewayFuture;
Review comment:
I would suggest to properly annotate all fields in this class now with
their guard.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerImpl.java
##########
@@ -383,11 +554,15 @@ private void startJobMasterServiceSafely(UUID
leaderSessionId) {
}
}
});
- } catch (Exception e) {
+ return newJobMasterService;
+ } catch (Exception initializationError) {
resultFuture.complete(
JobManagerRunnerResult.forInitializationFailure(
new JobInitializationException(
- jobGraph.getJobID(), "Could not start the
JobMaster.", e)));
+ jobGraph.getJobID(),
+ "Could not start the JobMaster.",
+ initializationError)));
+ return null;
Review comment:
This pattern is not so intuitive. It would be simpler if the
`startJobMastserServiceSafely` method would return a creation result which
either is succeeded or failed. The rest can then be left to the caller. For
example, what happens if the `JobManagerRunner` has lost the leadership in the
mean time? In this case, the result should be ignored.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]