This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d745f5b3f7a64445854c735668afa9b72edb3fee Author: Weijie Guo <[email protected]> AuthorDate: Mon Nov 7 15:28:25 2022 +0800 [FLINK-29234] JobMasterServiceLeadershipRunner#closeAsync has a lock with an excessive range Currently, in JobMasterServiceLeadershipRunner, Leader event callback such as grantLeadership and revokeLeadership will be invoked with nested lock(e.g. curator's event lock and DefaultLeaderElectionService's lock) and try to acquire JobMasterServiceLeadershipRunner's own lock. This will lead to many deadlock risks, such as the simultaneous close and grantLeadership. To avoid this, we narrow the range of lock within JobMasterServiceLeadershipRunner#closeAsync. In the future, we'd bett [...] This closes #21137 --- .../JobMasterServiceLeadershipRunner.java | 58 +++--- .../JobMasterServiceLeadershipRunnerTest.java | 206 ++++++++++++++++++--- .../jobmaster/TestingJobMasterServiceProcess.java | 121 ++++++------ 3 files changed, 269 insertions(+), 116 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java index f11f552f725..fd1fedca3b1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java @@ -124,40 +124,40 @@ public class JobMasterServiceLeadershipRunner implements JobManagerRunner, Leade @Override public CompletableFuture<Void> closeAsync() { + final CompletableFuture<Void> processTerminationFuture; synchronized (lock) { - if (state != State.STOPPED) { - state = State.STOPPED; + if (state == State.STOPPED) { + return terminationFuture; + } - LOG.debug("Terminating the leadership runner for job {}.", getJobID()); + state = State.STOPPED; - jobMasterGatewayFuture.completeExceptionally( - new FlinkException( - "JobMasterServiceLeadershipRunner is closed. Therefore, the corresponding JobMaster will never acquire the leadership.")); - resultFuture.complete( - JobManagerRunnerResult.forSuccess( - createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED))); - - final CompletableFuture<Void> processTerminationFuture = - jobMasterServiceProcess.closeAsync(); - - final CompletableFuture<Void> serviceTerminationFuture = - FutureUtils.runAfterwards( - processTerminationFuture, - () -> { - classLoaderLease.release(); - leaderElectionService.stop(); - }); - - FutureUtils.forward(serviceTerminationFuture, terminationFuture); - - terminationFuture.whenComplete( - (unused, throwable) -> - LOG.debug( - "Leadership runner for job {} has been terminated.", - getJobID())); - } + LOG.debug("Terminating the leadership runner for job {}.", getJobID()); + + jobMasterGatewayFuture.completeExceptionally( + new FlinkException( + "JobMasterServiceLeadershipRunner is closed. Therefore, the corresponding JobMaster will never acquire the leadership.")); + + resultFuture.complete( + JobManagerRunnerResult.forSuccess( + createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED))); + + processTerminationFuture = jobMasterServiceProcess.closeAsync(); } + final CompletableFuture<Void> serviceTerminationFuture = + FutureUtils.runAfterwards( + processTerminationFuture, + () -> { + classLoaderLease.release(); + leaderElectionService.stop(); + }); + + FutureUtils.forward(serviceTerminationFuture, terminationFuture); + + terminationFuture.whenComplete( + (unused, throwable) -> + LOG.debug("Leadership runner for job {} has been terminated.", getJobID())); return terminationFuture; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index f4c4ebad405..cef3393b5d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; @@ -35,15 +36,20 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory; import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceProcessFactory; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; +import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionDriver; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.TestingJobResultStore; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingRunnable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -151,8 +157,7 @@ class JobMasterServiceLeadershipRunnerTest { newJobMasterServiceLeadershipRunnerBuilder() .withJobMasterServiceProcesses( TestingJobMasterServiceProcess.newBuilder() - .setTerminationFuture(terminationFuture) - .withManualTerminationFutureCompletion() + .setCloseAsyncSupplier(() -> terminationFuture) .build(), TestingJobMasterServiceProcess.newBuilder().build()) .build(); @@ -185,7 +190,7 @@ class JobMasterServiceLeadershipRunnerTest { final CompletableFuture<JobManagerRunnerResult> resultFuture = new CompletableFuture<>(); final TestingJobMasterServiceProcess testingJobMasterServiceProcess = TestingJobMasterServiceProcess.newBuilder() - .setJobManagerRunnerResultFuture(resultFuture) + .setGetResultFutureSupplier(() -> resultFuture) .build(); JobManagerRunner jobManagerRunner = @@ -221,7 +226,7 @@ class JobMasterServiceLeadershipRunnerTest { newJobMasterServiceLeadershipRunnerBuilder() .withSingleJobMasterServiceProcess( TestingJobMasterServiceProcess.newBuilder() - .setJobManagerRunnerResultFuture(completedResultFuture) + .setGetResultFutureSupplier(() -> completedResultFuture) .build()) .build(); @@ -256,7 +261,11 @@ class JobMasterServiceLeadershipRunnerTest { newJobMasterServiceLeadershipRunnerBuilder() .withSingleJobMasterServiceProcess( TestingJobMasterServiceProcess.newBuilder() - .setTerminationFuture(terminationFuture) + .setCloseAsyncSupplier( + () -> { + terminationFuture.complete(null); + return terminationFuture; + }) .build()) .build(); @@ -277,7 +286,11 @@ class JobMasterServiceLeadershipRunnerTest { newJobMasterServiceLeadershipRunnerBuilder() .withSingleJobMasterServiceProcess( TestingJobMasterServiceProcess.newBuilder() - .setTerminationFuture(terminationFuture) + .setCloseAsyncSupplier( + () -> { + terminationFuture.complete(null); + return terminationFuture; + }) .build()) .build(); @@ -298,7 +311,8 @@ class JobMasterServiceLeadershipRunnerTest { newJobMasterServiceLeadershipRunnerBuilder() .withSingleJobMasterServiceProcess( TestingJobMasterServiceProcess.newBuilder() - .setJobMasterGatewayFuture(jobMasterGatewayFuture) + .setGetJobMasterGatewayFutureSupplier( + () -> jobMasterGatewayFuture) .build()) .build(); @@ -336,7 +350,7 @@ class JobMasterServiceLeadershipRunnerTest { newJobMasterServiceLeadershipRunnerBuilder() .withSingleJobMasterServiceProcess( TestingJobMasterServiceProcess.newBuilder() - .setIsInitialized(false) + .setIsInitializedAndRunningSupplier(() -> false) .build()) .build(); @@ -381,12 +395,15 @@ class JobMasterServiceLeadershipRunnerTest { newJobMasterServiceLeadershipRunnerBuilder() .withJobMasterServiceProcesses( TestingJobMasterServiceProcess.newBuilder() - .setTerminationFuture(firstTerminationFuture) - .withManualTerminationFutureCompletion() - .setIsInitialized(false) + .setCloseAsyncSupplier(() -> firstTerminationFuture) + .setIsInitializedAndRunningSupplier(() -> false) .build(), TestingJobMasterServiceProcess.newBuilder() - .setTerminationFuture(secondTerminationFuture) + .setCloseAsyncSupplier( + () -> { + secondTerminationFuture.complete(null); + return secondTerminationFuture; + }) .build()) .build(); @@ -437,10 +454,11 @@ class JobMasterServiceLeadershipRunnerTest { newJobMasterServiceLeadershipRunnerBuilder() .withSingleJobMasterServiceProcess( TestingJobMasterServiceProcess.newBuilder() - .setIsInitialized(false) - .setJobMasterGatewayFuture(new CompletableFuture<>()) - .setJobManagerRunnerResultFuture( - jobManagerRunnerResultFuture) + .setGetJobMasterGatewayFutureSupplier( + CompletableFuture::new) + .setGetResultFutureSupplier( + () -> jobManagerRunnerResultFuture) + .setIsInitializedAndRunningSupplier(() -> false) .build()) .build(); @@ -466,7 +484,7 @@ class JobMasterServiceLeadershipRunnerTest { newJobMasterServiceLeadershipRunnerBuilder() .withSingleJobMasterServiceProcess( TestingJobMasterServiceProcess.newBuilder() - .setJobManagerRunnerResultFuture(resultFuture) + .setGetResultFutureSupplier(() -> resultFuture) .build()) .build(); @@ -489,7 +507,8 @@ class JobMasterServiceLeadershipRunnerTest { newJobMasterServiceLeadershipRunnerBuilder() .withSingleJobMasterServiceProcess( TestingJobMasterServiceProcess.newBuilder() - .setJobMasterGatewayFuture(new CompletableFuture<>()) + .setGetJobMasterGatewayFutureSupplier( + CompletableFuture::new) .build()) .build(); @@ -512,7 +531,8 @@ class JobMasterServiceLeadershipRunnerTest { newJobMasterServiceLeadershipRunnerBuilder() .withSingleJobMasterServiceProcess( TestingJobMasterServiceProcess.newBuilder() - .setLeaderAddressFuture(leaderAddressFuture) + .setGetLeaderAddressFutureSupplier( + () -> leaderAddressFuture) .build()) .build(); @@ -554,8 +574,20 @@ class JobMasterServiceLeadershipRunnerTest { @Test void testJobStatusCancellingIsClearedOnLeadershipLoss() throws Exception { + CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); final JobMasterServiceLeadershipRunner jobManagerRunner = - newJobMasterServiceLeadershipRunnerBuilder().build(); + newJobMasterServiceLeadershipRunnerBuilder() + .withSingleJobMasterServiceProcess( + TestingJobMasterServiceProcess.newBuilder() + .setCloseAsyncSupplier( + () -> { + terminationFuture.complete(null); + return terminationFuture; + }) + .setIsInitializedAndRunningSupplier( + () -> !terminationFuture.isDone()) + .build()) + .build(); jobManagerRunner.start(); @@ -575,8 +607,7 @@ class JobMasterServiceLeadershipRunnerTest { newJobMasterServiceLeadershipRunnerBuilder() .withSingleJobMasterServiceProcess( TestingJobMasterServiceProcess.newBuilder() - .setTerminationFuture(terminationFuture) - .withManualTerminationFutureCompletion() + .setCloseAsyncSupplier(() -> terminationFuture) .build()) .build(); @@ -643,6 +674,128 @@ class JobMasterServiceLeadershipRunnerTest { } } + @Test + void testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip() + throws Exception { + final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory + testingLeaderElectionDriverFactory = + new TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(); + + // we need to use DefaultLeaderElectionService here because JobMasterServiceLeadershipRunner + // in connection with the DefaultLeaderElectionService generates the nested locking + final LeaderElectionService defaultLeaderElectionService = + new DefaultLeaderElectionService(testingLeaderElectionDriverFactory); + + // latch to detect when we reached the first synchronized section having a lock on the + // JobMasterServiceProcess#stop side + final OneShotLatch closeAsyncCalledTrigger = new OneShotLatch(); + // latch to halt the JobMasterServiceProcess#stop before calling stop on the + // DefaultLeaderElectionService instance (and entering the LeaderElectionService's + // synchronized block) + final OneShotLatch triggerClassLoaderLeaseRelease = new OneShotLatch(); + + final JobMasterServiceProcess jobMasterServiceProcess = + TestingJobMasterServiceProcess.newBuilder() + .setGetJobMasterGatewayFutureSupplier(CompletableFuture::new) + .setGetResultFutureSupplier(CompletableFuture::new) + .setGetLeaderAddressFutureSupplier( + () -> CompletableFuture.completedFuture("unused address")) + .setCloseAsyncSupplier( + () -> { + closeAsyncCalledTrigger.trigger(); + // we have to return a completed future because we need the + // follow-up task to run in the calling thread to make the + // follow-up code block being executed in the synchronized block + return CompletableFuture.completedFuture(null); + }) + .build(); + try (final JobMasterServiceLeadershipRunner jobManagerRunner = + newJobMasterServiceLeadershipRunnerBuilder() + .setClassLoaderLease( + TestingClassLoaderLease.newBuilder() + .setCloseRunnable( + () -> { + try { + // we want to wait with releasing to halt + // before calling stop on the + // DefaultLeaderElectionService + triggerClassLoaderLeaseRelease.await(); + // In order to reproduce the deadlock, we + // need to ensure that + // leaderContender#grantLeadership can be + // called after + // JobMasterServiceLeadershipRunner obtains + // its own lock. Unfortunately, This will + // change the running status of + // DefaultLeaderElectionService + // to false, which will cause the + // notification of leadership to be + // ignored. The issue is that we + // don't have any means of verify that we're + // in the synchronized block of + // DefaultLeaderElectionService#lock in + // DefaultLeaderElectionService#onGrantLeadership, + // but we trigger this implicitly through + // TestingLeaderElectionDriver#isLeader(). + // Adding a short sleep can ensure that + // another thread successfully receives the + // leadership notification, so that the + // deadlock problem can recur. + Thread.sleep(5); + } catch (InterruptedException e) { + ExceptionUtils.checkInterrupted(e); + } + }) + .build()) + .setJobMasterServiceProcessFactory( + TestingJobMasterServiceProcessFactory.newBuilder() + .setJobMasterServiceProcessFunction( + ignoredSessionId -> jobMasterServiceProcess) + .build()) + .setLeaderElectionService(defaultLeaderElectionService) + .build()) { + jobManagerRunner.start(); + + final TestingLeaderElectionDriver currentLeaderDriver = + Preconditions.checkNotNull( + testingLeaderElectionDriverFactory.getCurrentLeaderDriver()); + // grant leadership to create jobMasterServiceProcess + currentLeaderDriver.isLeader(); + + while (currentLeaderDriver.getLeaderInformation().getLeaderSessionID() == null + || !defaultLeaderElectionService.hasLeadership( + currentLeaderDriver.getLeaderInformation().getLeaderSessionID())) { + Thread.sleep(100); + } + + final CheckedThread contenderCloseThread = createCheckedThread(jobManagerRunner::close); + contenderCloseThread.start(); + + // waiting for the contender reaching the synchronized section of the stop call + closeAsyncCalledTrigger.await(); + + final CheckedThread grantLeadershipThread = + createCheckedThread(currentLeaderDriver::isLeader); + grantLeadershipThread.start(); + + // finalize ClassloaderLease release to trigger DefaultLeaderElectionService#stop + triggerClassLoaderLeaseRelease.trigger(); + + contenderCloseThread.sync(); + grantLeadershipThread.sync(); + } + } + + private static CheckedThread createCheckedThread( + ThrowingRunnable<? extends Exception> callback) { + return new CheckedThread() { + @Override + public void go() throws Exception { + callback.run(); + } + }; + } + private void assertJobNotFinished(CompletableFuture<JobManagerRunnerResult> resultFuture) throws ExecutionException, InterruptedException { final JobManagerRunnerResult jobManagerRunnerResult = resultFuture.get(); @@ -664,6 +817,9 @@ class JobMasterServiceLeadershipRunnerTest { private LibraryCacheManager.ClassLoaderLease classLoaderLease = TestingClassLoaderLease.newBuilder().build(); + private LeaderElectionService leaderElectionService = + JobMasterServiceLeadershipRunnerTest.this.leaderElectionService; + public JobMasterServiceLeadershipRunnerBuilder setClassLoaderLease( LibraryCacheManager.ClassLoaderLease classLoaderLease) { this.classLoaderLease = classLoaderLease; @@ -676,6 +832,12 @@ class JobMasterServiceLeadershipRunnerTest { return this; } + public JobMasterServiceLeadershipRunnerBuilder setLeaderElectionService( + LeaderElectionService leaderElectionService) { + this.leaderElectionService = leaderElectionService; + return this; + } + public JobMasterServiceLeadershipRunner build() { return new JobMasterServiceLeadershipRunner( jobMasterServiceProcessFactory, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java index fd3ae35033e..82edd4f0ae0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java @@ -20,124 +20,115 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; -import javax.annotation.Nullable; - import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; /** Testing implementation of {@link JobMasterServiceProcess}. */ public class TestingJobMasterServiceProcess implements JobMasterServiceProcess { - private final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture; - - private final CompletableFuture<JobManagerRunnerResult> jobManagerRunnerResultFuture; - - private final CompletableFuture<String> leaderAddressFuture; - - private final boolean isInitialized; - - private final CompletableFuture<Void> terminationFuture; - - private final boolean manualTerminationFutureCompletion; + private final Supplier<CompletableFuture<Void>> closeAsyncSupplier; + private final Supplier<Boolean> isInitializedAndRunningSupplier; + private final Supplier<CompletableFuture<JobMasterGateway>> getJobMasterGatewayFutureSupplier; + private final Supplier<CompletableFuture<JobManagerRunnerResult>> getResultFutureSupplier; + private final Supplier<CompletableFuture<String>> getLeaderAddressFutureSupplier; private TestingJobMasterServiceProcess( - CompletableFuture<JobMasterGateway> jobMasterGatewayFuture, - CompletableFuture<JobManagerRunnerResult> jobManagerRunnerResultFuture, - CompletableFuture<String> leaderAddressFuture, - boolean isInitialized, - CompletableFuture<Void> terminationFuture, - boolean manualTerminationFutureCompletion) { - this.jobMasterGatewayFuture = jobMasterGatewayFuture; - this.jobManagerRunnerResultFuture = jobManagerRunnerResultFuture; - this.leaderAddressFuture = leaderAddressFuture; - this.isInitialized = isInitialized; - this.terminationFuture = terminationFuture; - this.manualTerminationFutureCompletion = manualTerminationFutureCompletion; + Supplier<CompletableFuture<Void>> closeAsyncSupplier, + Supplier<Boolean> isInitializedAndRunningSupplier, + Supplier<CompletableFuture<JobMasterGateway>> getJobMasterGatewayFutureSupplier, + Supplier<CompletableFuture<JobManagerRunnerResult>> getResultFutureSupplier, + Supplier<CompletableFuture<String>> getLeaderAddressFutureSupplier) { + this.closeAsyncSupplier = closeAsyncSupplier; + this.isInitializedAndRunningSupplier = isInitializedAndRunningSupplier; + this.getJobMasterGatewayFutureSupplier = getJobMasterGatewayFutureSupplier; + this.getResultFutureSupplier = getResultFutureSupplier; + this.getLeaderAddressFutureSupplier = getLeaderAddressFutureSupplier; } @Override public CompletableFuture<Void> closeAsync() { - if (!manualTerminationFutureCompletion) { - terminationFuture.complete(null); - } - - return terminationFuture; + return closeAsyncSupplier.get(); } @Override public boolean isInitializedAndRunning() { - return isInitialized && !terminationFuture.isDone(); + return isInitializedAndRunningSupplier.get(); } @Override public CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture() { - return jobMasterGatewayFuture; + return getJobMasterGatewayFutureSupplier.get(); } @Override public CompletableFuture<JobManagerRunnerResult> getResultFuture() { - return jobManagerRunnerResultFuture; + return getResultFutureSupplier.get(); } @Override public CompletableFuture<String> getLeaderAddressFuture() { - return leaderAddressFuture; + return getLeaderAddressFutureSupplier.get(); } public static Builder newBuilder() { return new Builder(); } + /** Builder for {@link TestingJobMasterServiceProcess}. */ public static final class Builder { - private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = - CompletableFuture.completedFuture(new TestingJobMasterGatewayBuilder().build()); - private CompletableFuture<JobManagerRunnerResult> jobManagerRunnerResultFuture = - new CompletableFuture<>(); - private CompletableFuture<String> leaderAddressFuture = - CompletableFuture.completedFuture("foobar"); - private boolean isInitialized = true; - @Nullable private CompletableFuture<Void> terminationFuture = new CompletableFuture<>(); - private boolean manualTerminationFutureCompletion = false; - - public Builder setJobMasterGatewayFuture( - CompletableFuture<JobMasterGateway> jobMasterGatewayFuture) { - this.jobMasterGatewayFuture = jobMasterGatewayFuture; - return this; + private Supplier<CompletableFuture<Void>> closeAsyncSupplier = unsupportedOperation(); + private Supplier<Boolean> isInitializedAndRunningSupplier = unsupportedOperation(); + private Supplier<CompletableFuture<JobMasterGateway>> getJobMasterGatewayFutureSupplier = + () -> + CompletableFuture.completedFuture( + new TestingJobMasterGatewayBuilder().build()); + private Supplier<CompletableFuture<JobManagerRunnerResult>> getResultFutureSupplier = + CompletableFuture::new; + private Supplier<CompletableFuture<String>> getLeaderAddressFutureSupplier = + () -> CompletableFuture.completedFuture("leader address"); + + private static <T> Supplier<T> unsupportedOperation() { + return () -> { + throw new UnsupportedOperationException(); + }; } - public Builder setJobManagerRunnerResultFuture( - CompletableFuture<JobManagerRunnerResult> jobManagerRunnerResultFuture) { - this.jobManagerRunnerResultFuture = jobManagerRunnerResultFuture; + public Builder setCloseAsyncSupplier(Supplier<CompletableFuture<Void>> closeAsyncSupplier) { + this.closeAsyncSupplier = closeAsyncSupplier; return this; } - public Builder setLeaderAddressFuture(CompletableFuture<String> leaderAddressFuture) { - this.leaderAddressFuture = leaderAddressFuture; + public Builder setIsInitializedAndRunningSupplier( + Supplier<Boolean> isInitializedAndRunningSupplier) { + this.isInitializedAndRunningSupplier = isInitializedAndRunningSupplier; return this; } - public Builder setIsInitialized(boolean isInitialized) { - this.isInitialized = isInitialized; + public Builder setGetJobMasterGatewayFutureSupplier( + Supplier<CompletableFuture<JobMasterGateway>> getJobMasterGatewayFutureSupplier) { + this.getJobMasterGatewayFutureSupplier = getJobMasterGatewayFutureSupplier; return this; } - public Builder setTerminationFuture(@Nullable CompletableFuture<Void> terminationFuture) { - this.terminationFuture = terminationFuture; + public Builder setGetResultFutureSupplier( + Supplier<CompletableFuture<JobManagerRunnerResult>> getResultFutureSupplier) { + this.getResultFutureSupplier = getResultFutureSupplier; return this; } - public Builder withManualTerminationFutureCompletion() { - this.manualTerminationFutureCompletion = true; + public Builder setGetLeaderAddressFutureSupplier( + Supplier<CompletableFuture<String>> getLeaderAddressFutureSupplier) { + this.getLeaderAddressFutureSupplier = getLeaderAddressFutureSupplier; return this; } public TestingJobMasterServiceProcess build() { return new TestingJobMasterServiceProcess( - jobMasterGatewayFuture, - jobManagerRunnerResultFuture, - leaderAddressFuture, - isInitialized, - terminationFuture, - manualTerminationFutureCompletion); + closeAsyncSupplier, + isInitializedAndRunningSupplier, + getJobMasterGatewayFutureSupplier, + getResultFutureSupplier, + getLeaderAddressFutureSupplier); } } }
