This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d745f5b3f7a64445854c735668afa9b72edb3fee
Author: Weijie Guo <[email protected]>
AuthorDate: Mon Nov 7 15:28:25 2022 +0800

    [FLINK-29234] JobMasterServiceLeadershipRunner#closeAsync has a lock with 
an excessive range
    
    Currently, in JobMasterServiceLeadershipRunner, Leader event callback such 
as grantLeadership and revokeLeadership will be invoked with nested lock(e.g. 
curator's event lock and DefaultLeaderElectionService's lock) and try to 
acquire JobMasterServiceLeadershipRunner's own lock. This will lead to many 
deadlock risks, such as the simultaneous close and grantLeadership. To avoid 
this, we narrow the range of lock within 
JobMasterServiceLeadershipRunner#closeAsync. In the future, we'd bett [...]
    
    This closes #21137
---
 .../JobMasterServiceLeadershipRunner.java          |  58 +++---
 .../JobMasterServiceLeadershipRunnerTest.java      | 206 ++++++++++++++++++---
 .../jobmaster/TestingJobMasterServiceProcess.java  | 121 ++++++------
 3 files changed, 269 insertions(+), 116 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
index f11f552f725..fd1fedca3b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
@@ -124,40 +124,40 @@ public class JobMasterServiceLeadershipRunner implements 
JobManagerRunner, Leade
 
     @Override
     public CompletableFuture<Void> closeAsync() {
+        final CompletableFuture<Void> processTerminationFuture;
         synchronized (lock) {
-            if (state != State.STOPPED) {
-                state = State.STOPPED;
+            if (state == State.STOPPED) {
+                return terminationFuture;
+            }
 
-                LOG.debug("Terminating the leadership runner for job {}.", 
getJobID());
+            state = State.STOPPED;
 
-                jobMasterGatewayFuture.completeExceptionally(
-                        new FlinkException(
-                                "JobMasterServiceLeadershipRunner is closed. 
Therefore, the corresponding JobMaster will never acquire the leadership."));
-                resultFuture.complete(
-                        JobManagerRunnerResult.forSuccess(
-                                
createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED)));
-
-                final CompletableFuture<Void> processTerminationFuture =
-                        jobMasterServiceProcess.closeAsync();
-
-                final CompletableFuture<Void> serviceTerminationFuture =
-                        FutureUtils.runAfterwards(
-                                processTerminationFuture,
-                                () -> {
-                                    classLoaderLease.release();
-                                    leaderElectionService.stop();
-                                });
-
-                FutureUtils.forward(serviceTerminationFuture, 
terminationFuture);
-
-                terminationFuture.whenComplete(
-                        (unused, throwable) ->
-                                LOG.debug(
-                                        "Leadership runner for job {} has been 
terminated.",
-                                        getJobID()));
-            }
+            LOG.debug("Terminating the leadership runner for job {}.", 
getJobID());
+
+            jobMasterGatewayFuture.completeExceptionally(
+                    new FlinkException(
+                            "JobMasterServiceLeadershipRunner is closed. 
Therefore, the corresponding JobMaster will never acquire the leadership."));
+
+            resultFuture.complete(
+                    JobManagerRunnerResult.forSuccess(
+                            
createExecutionGraphInfoWithJobStatus(JobStatus.SUSPENDED)));
+
+            processTerminationFuture = jobMasterServiceProcess.closeAsync();
         }
 
+        final CompletableFuture<Void> serviceTerminationFuture =
+                FutureUtils.runAfterwards(
+                        processTerminationFuture,
+                        () -> {
+                            classLoaderLease.release();
+                            leaderElectionService.stop();
+                        });
+
+        FutureUtils.forward(serviceTerminationFuture, terminationFuture);
+
+        terminationFuture.whenComplete(
+                (unused, throwable) ->
+                        LOG.debug("Leadership runner for job {} has been 
terminated.", getJobID()));
         return terminationFuture;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
index f4c4ebad405..cef3393b5d4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
@@ -35,15 +36,20 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import 
org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
 import 
org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceProcessFactory;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
+import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionDriver;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.TestingJobResultStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.ThrowingRunnable;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -151,8 +157,7 @@ class JobMasterServiceLeadershipRunnerTest {
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .withJobMasterServiceProcesses(
                                 TestingJobMasterServiceProcess.newBuilder()
-                                        
.setTerminationFuture(terminationFuture)
-                                        
.withManualTerminationFutureCompletion()
+                                        .setCloseAsyncSupplier(() -> 
terminationFuture)
                                         .build(),
                                 
TestingJobMasterServiceProcess.newBuilder().build())
                         .build();
@@ -185,7 +190,7 @@ class JobMasterServiceLeadershipRunnerTest {
         final CompletableFuture<JobManagerRunnerResult> resultFuture = new 
CompletableFuture<>();
         final TestingJobMasterServiceProcess testingJobMasterServiceProcess =
                 TestingJobMasterServiceProcess.newBuilder()
-                        .setJobManagerRunnerResultFuture(resultFuture)
+                        .setGetResultFutureSupplier(() -> resultFuture)
                         .build();
 
         JobManagerRunner jobManagerRunner =
@@ -221,7 +226,7 @@ class JobMasterServiceLeadershipRunnerTest {
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .withSingleJobMasterServiceProcess(
                                 TestingJobMasterServiceProcess.newBuilder()
-                                        
.setJobManagerRunnerResultFuture(completedResultFuture)
+                                        .setGetResultFutureSupplier(() -> 
completedResultFuture)
                                         .build())
                         .build();
 
@@ -256,7 +261,11 @@ class JobMasterServiceLeadershipRunnerTest {
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .withSingleJobMasterServiceProcess(
                                 TestingJobMasterServiceProcess.newBuilder()
-                                        
.setTerminationFuture(terminationFuture)
+                                        .setCloseAsyncSupplier(
+                                                () -> {
+                                                    
terminationFuture.complete(null);
+                                                    return terminationFuture;
+                                                })
                                         .build())
                         .build();
 
@@ -277,7 +286,11 @@ class JobMasterServiceLeadershipRunnerTest {
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .withSingleJobMasterServiceProcess(
                                 TestingJobMasterServiceProcess.newBuilder()
-                                        
.setTerminationFuture(terminationFuture)
+                                        .setCloseAsyncSupplier(
+                                                () -> {
+                                                    
terminationFuture.complete(null);
+                                                    return terminationFuture;
+                                                })
                                         .build())
                         .build();
 
@@ -298,7 +311,8 @@ class JobMasterServiceLeadershipRunnerTest {
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .withSingleJobMasterServiceProcess(
                                 TestingJobMasterServiceProcess.newBuilder()
-                                        
.setJobMasterGatewayFuture(jobMasterGatewayFuture)
+                                        .setGetJobMasterGatewayFutureSupplier(
+                                                () -> jobMasterGatewayFuture)
                                         .build())
                         .build();
 
@@ -336,7 +350,7 @@ class JobMasterServiceLeadershipRunnerTest {
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .withSingleJobMasterServiceProcess(
                                 TestingJobMasterServiceProcess.newBuilder()
-                                        .setIsInitialized(false)
+                                        .setIsInitializedAndRunningSupplier(() 
-> false)
                                         .build())
                         .build();
 
@@ -381,12 +395,15 @@ class JobMasterServiceLeadershipRunnerTest {
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .withJobMasterServiceProcesses(
                                 TestingJobMasterServiceProcess.newBuilder()
-                                        
.setTerminationFuture(firstTerminationFuture)
-                                        
.withManualTerminationFutureCompletion()
-                                        .setIsInitialized(false)
+                                        .setCloseAsyncSupplier(() -> 
firstTerminationFuture)
+                                        .setIsInitializedAndRunningSupplier(() 
-> false)
                                         .build(),
                                 TestingJobMasterServiceProcess.newBuilder()
-                                        
.setTerminationFuture(secondTerminationFuture)
+                                        .setCloseAsyncSupplier(
+                                                () -> {
+                                                    
secondTerminationFuture.complete(null);
+                                                    return 
secondTerminationFuture;
+                                                })
                                         .build())
                         .build();
 
@@ -437,10 +454,11 @@ class JobMasterServiceLeadershipRunnerTest {
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .withSingleJobMasterServiceProcess(
                                 TestingJobMasterServiceProcess.newBuilder()
-                                        .setIsInitialized(false)
-                                        .setJobMasterGatewayFuture(new 
CompletableFuture<>())
-                                        .setJobManagerRunnerResultFuture(
-                                                jobManagerRunnerResultFuture)
+                                        .setGetJobMasterGatewayFutureSupplier(
+                                                CompletableFuture::new)
+                                        .setGetResultFutureSupplier(
+                                                () -> 
jobManagerRunnerResultFuture)
+                                        .setIsInitializedAndRunningSupplier(() 
-> false)
                                         .build())
                         .build();
 
@@ -466,7 +484,7 @@ class JobMasterServiceLeadershipRunnerTest {
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .withSingleJobMasterServiceProcess(
                                 TestingJobMasterServiceProcess.newBuilder()
-                                        
.setJobManagerRunnerResultFuture(resultFuture)
+                                        .setGetResultFutureSupplier(() -> 
resultFuture)
                                         .build())
                         .build();
 
@@ -489,7 +507,8 @@ class JobMasterServiceLeadershipRunnerTest {
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .withSingleJobMasterServiceProcess(
                                 TestingJobMasterServiceProcess.newBuilder()
-                                        .setJobMasterGatewayFuture(new 
CompletableFuture<>())
+                                        .setGetJobMasterGatewayFutureSupplier(
+                                                CompletableFuture::new)
                                         .build())
                         .build();
 
@@ -512,7 +531,8 @@ class JobMasterServiceLeadershipRunnerTest {
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .withSingleJobMasterServiceProcess(
                                 TestingJobMasterServiceProcess.newBuilder()
-                                        
.setLeaderAddressFuture(leaderAddressFuture)
+                                        .setGetLeaderAddressFutureSupplier(
+                                                () -> leaderAddressFuture)
                                         .build())
                         .build();
 
@@ -554,8 +574,20 @@ class JobMasterServiceLeadershipRunnerTest {
 
     @Test
     void testJobStatusCancellingIsClearedOnLeadershipLoss() throws Exception {
+        CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
         final JobMasterServiceLeadershipRunner jobManagerRunner =
-                newJobMasterServiceLeadershipRunnerBuilder().build();
+                newJobMasterServiceLeadershipRunnerBuilder()
+                        .withSingleJobMasterServiceProcess(
+                                TestingJobMasterServiceProcess.newBuilder()
+                                        .setCloseAsyncSupplier(
+                                                () -> {
+                                                    
terminationFuture.complete(null);
+                                                    return terminationFuture;
+                                                })
+                                        .setIsInitializedAndRunningSupplier(
+                                                () -> 
!terminationFuture.isDone())
+                                        .build())
+                        .build();
 
         jobManagerRunner.start();
 
@@ -575,8 +607,7 @@ class JobMasterServiceLeadershipRunnerTest {
                 newJobMasterServiceLeadershipRunnerBuilder()
                         .withSingleJobMasterServiceProcess(
                                 TestingJobMasterServiceProcess.newBuilder()
-                                        
.setTerminationFuture(terminationFuture)
-                                        
.withManualTerminationFutureCompletion()
+                                        .setCloseAsyncSupplier(() -> 
terminationFuture)
                                         .build())
                         .build();
 
@@ -643,6 +674,128 @@ class JobMasterServiceLeadershipRunnerTest {
         }
     }
 
+    @Test
+    void 
testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip()
+            throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
+                testingLeaderElectionDriverFactory =
+                        new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+
+        // we need to use DefaultLeaderElectionService here because 
JobMasterServiceLeadershipRunner
+        // in connection with the DefaultLeaderElectionService generates the 
nested locking
+        final LeaderElectionService defaultLeaderElectionService =
+                new 
DefaultLeaderElectionService(testingLeaderElectionDriverFactory);
+
+        // latch to detect when we reached the first synchronized section 
having a lock on the
+        // JobMasterServiceProcess#stop side
+        final OneShotLatch closeAsyncCalledTrigger = new OneShotLatch();
+        // latch to halt the JobMasterServiceProcess#stop before calling stop 
on the
+        // DefaultLeaderElectionService instance (and entering the 
LeaderElectionService's
+        // synchronized block)
+        final OneShotLatch triggerClassLoaderLeaseRelease = new OneShotLatch();
+
+        final JobMasterServiceProcess jobMasterServiceProcess =
+                TestingJobMasterServiceProcess.newBuilder()
+                        
.setGetJobMasterGatewayFutureSupplier(CompletableFuture::new)
+                        .setGetResultFutureSupplier(CompletableFuture::new)
+                        .setGetLeaderAddressFutureSupplier(
+                                () -> 
CompletableFuture.completedFuture("unused address"))
+                        .setCloseAsyncSupplier(
+                                () -> {
+                                    closeAsyncCalledTrigger.trigger();
+                                    // we have to return a completed future 
because we need the
+                                    // follow-up task to run in the calling 
thread to make the
+                                    // follow-up code block being executed in 
the synchronized block
+                                    return 
CompletableFuture.completedFuture(null);
+                                })
+                        .build();
+        try (final JobMasterServiceLeadershipRunner jobManagerRunner =
+                newJobMasterServiceLeadershipRunnerBuilder()
+                        .setClassLoaderLease(
+                                TestingClassLoaderLease.newBuilder()
+                                        .setCloseRunnable(
+                                                () -> {
+                                                    try {
+                                                        // we want to wait 
with releasing to halt
+                                                        // before calling stop 
on the
+                                                        // 
DefaultLeaderElectionService
+                                                        
triggerClassLoaderLeaseRelease.await();
+                                                        // In order to 
reproduce the deadlock, we
+                                                        // need to ensure that
+                                                        // 
leaderContender#grantLeadership can be
+                                                        // called after
+                                                        // 
JobMasterServiceLeadershipRunner obtains
+                                                        // its own lock. 
Unfortunately, This will
+                                                        // change the running 
status of
+                                                        // 
DefaultLeaderElectionService
+                                                        // to false, which 
will cause the
+                                                        // notification of 
leadership to be
+                                                        // ignored. The issue 
is that we
+                                                        // don't have any 
means of verify that we're
+                                                        // in the synchronized 
block of
+                                                        // 
DefaultLeaderElectionService#lock in
+                                                        // 
DefaultLeaderElectionService#onGrantLeadership,
+                                                        // but we trigger this 
implicitly through
+                                                        // 
TestingLeaderElectionDriver#isLeader().
+                                                        // Adding a short 
sleep can ensure that
+                                                        // another thread 
successfully receives the
+                                                        // leadership 
notification, so that the
+                                                        // deadlock problem 
can recur.
+                                                        Thread.sleep(5);
+                                                    } catch 
(InterruptedException e) {
+                                                        
ExceptionUtils.checkInterrupted(e);
+                                                    }
+                                                })
+                                        .build())
+                        .setJobMasterServiceProcessFactory(
+                                
TestingJobMasterServiceProcessFactory.newBuilder()
+                                        .setJobMasterServiceProcessFunction(
+                                                ignoredSessionId -> 
jobMasterServiceProcess)
+                                        .build())
+                        .setLeaderElectionService(defaultLeaderElectionService)
+                        .build()) {
+            jobManagerRunner.start();
+
+            final TestingLeaderElectionDriver currentLeaderDriver =
+                    Preconditions.checkNotNull(
+                            
testingLeaderElectionDriverFactory.getCurrentLeaderDriver());
+            // grant leadership to create jobMasterServiceProcess
+            currentLeaderDriver.isLeader();
+
+            while 
(currentLeaderDriver.getLeaderInformation().getLeaderSessionID() == null
+                    || !defaultLeaderElectionService.hasLeadership(
+                            
currentLeaderDriver.getLeaderInformation().getLeaderSessionID())) {
+                Thread.sleep(100);
+            }
+
+            final CheckedThread contenderCloseThread = 
createCheckedThread(jobManagerRunner::close);
+            contenderCloseThread.start();
+
+            // waiting for the contender reaching the synchronized section of 
the stop call
+            closeAsyncCalledTrigger.await();
+
+            final CheckedThread grantLeadershipThread =
+                    createCheckedThread(currentLeaderDriver::isLeader);
+            grantLeadershipThread.start();
+
+            // finalize ClassloaderLease release to trigger 
DefaultLeaderElectionService#stop
+            triggerClassLoaderLeaseRelease.trigger();
+
+            contenderCloseThread.sync();
+            grantLeadershipThread.sync();
+        }
+    }
+
+    private static CheckedThread createCheckedThread(
+            ThrowingRunnable<? extends Exception> callback) {
+        return new CheckedThread() {
+            @Override
+            public void go() throws Exception {
+                callback.run();
+            }
+        };
+    }
+
     private void 
assertJobNotFinished(CompletableFuture<JobManagerRunnerResult> resultFuture)
             throws ExecutionException, InterruptedException {
         final JobManagerRunnerResult jobManagerRunnerResult = 
resultFuture.get();
@@ -664,6 +817,9 @@ class JobMasterServiceLeadershipRunnerTest {
         private LibraryCacheManager.ClassLoaderLease classLoaderLease =
                 TestingClassLoaderLease.newBuilder().build();
 
+        private LeaderElectionService leaderElectionService =
+                
JobMasterServiceLeadershipRunnerTest.this.leaderElectionService;
+
         public JobMasterServiceLeadershipRunnerBuilder setClassLoaderLease(
                 LibraryCacheManager.ClassLoaderLease classLoaderLease) {
             this.classLoaderLease = classLoaderLease;
@@ -676,6 +832,12 @@ class JobMasterServiceLeadershipRunnerTest {
             return this;
         }
 
+        public JobMasterServiceLeadershipRunnerBuilder 
setLeaderElectionService(
+                LeaderElectionService leaderElectionService) {
+            this.leaderElectionService = leaderElectionService;
+            return this;
+        }
+
         public JobMasterServiceLeadershipRunner build() {
             return new JobMasterServiceLeadershipRunner(
                     jobMasterServiceProcessFactory,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java
index fd3ae35033e..82edd4f0ae0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobMasterServiceProcess.java
@@ -20,124 +20,115 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 
-import javax.annotation.Nullable;
-
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 /** Testing implementation of {@link JobMasterServiceProcess}. */
 public class TestingJobMasterServiceProcess implements JobMasterServiceProcess 
{
 
-    private final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture;
-
-    private final CompletableFuture<JobManagerRunnerResult> 
jobManagerRunnerResultFuture;
-
-    private final CompletableFuture<String> leaderAddressFuture;
-
-    private final boolean isInitialized;
-
-    private final CompletableFuture<Void> terminationFuture;
-
-    private final boolean manualTerminationFutureCompletion;
+    private final Supplier<CompletableFuture<Void>> closeAsyncSupplier;
+    private final Supplier<Boolean> isInitializedAndRunningSupplier;
+    private final Supplier<CompletableFuture<JobMasterGateway>> 
getJobMasterGatewayFutureSupplier;
+    private final Supplier<CompletableFuture<JobManagerRunnerResult>> 
getResultFutureSupplier;
+    private final Supplier<CompletableFuture<String>> 
getLeaderAddressFutureSupplier;
 
     private TestingJobMasterServiceProcess(
-            CompletableFuture<JobMasterGateway> jobMasterGatewayFuture,
-            CompletableFuture<JobManagerRunnerResult> 
jobManagerRunnerResultFuture,
-            CompletableFuture<String> leaderAddressFuture,
-            boolean isInitialized,
-            CompletableFuture<Void> terminationFuture,
-            boolean manualTerminationFutureCompletion) {
-        this.jobMasterGatewayFuture = jobMasterGatewayFuture;
-        this.jobManagerRunnerResultFuture = jobManagerRunnerResultFuture;
-        this.leaderAddressFuture = leaderAddressFuture;
-        this.isInitialized = isInitialized;
-        this.terminationFuture = terminationFuture;
-        this.manualTerminationFutureCompletion = 
manualTerminationFutureCompletion;
+            Supplier<CompletableFuture<Void>> closeAsyncSupplier,
+            Supplier<Boolean> isInitializedAndRunningSupplier,
+            Supplier<CompletableFuture<JobMasterGateway>> 
getJobMasterGatewayFutureSupplier,
+            Supplier<CompletableFuture<JobManagerRunnerResult>> 
getResultFutureSupplier,
+            Supplier<CompletableFuture<String>> 
getLeaderAddressFutureSupplier) {
+        this.closeAsyncSupplier = closeAsyncSupplier;
+        this.isInitializedAndRunningSupplier = isInitializedAndRunningSupplier;
+        this.getJobMasterGatewayFutureSupplier = 
getJobMasterGatewayFutureSupplier;
+        this.getResultFutureSupplier = getResultFutureSupplier;
+        this.getLeaderAddressFutureSupplier = getLeaderAddressFutureSupplier;
     }
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        if (!manualTerminationFutureCompletion) {
-            terminationFuture.complete(null);
-        }
-
-        return terminationFuture;
+        return closeAsyncSupplier.get();
     }
 
     @Override
     public boolean isInitializedAndRunning() {
-        return isInitialized && !terminationFuture.isDone();
+        return isInitializedAndRunningSupplier.get();
     }
 
     @Override
     public CompletableFuture<JobMasterGateway> getJobMasterGatewayFuture() {
-        return jobMasterGatewayFuture;
+        return getJobMasterGatewayFutureSupplier.get();
     }
 
     @Override
     public CompletableFuture<JobManagerRunnerResult> getResultFuture() {
-        return jobManagerRunnerResultFuture;
+        return getResultFutureSupplier.get();
     }
 
     @Override
     public CompletableFuture<String> getLeaderAddressFuture() {
-        return leaderAddressFuture;
+        return getLeaderAddressFutureSupplier.get();
     }
 
     public static Builder newBuilder() {
         return new Builder();
     }
 
+    /** Builder for {@link TestingJobMasterServiceProcess}. */
     public static final class Builder {
-        private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture =
-                CompletableFuture.completedFuture(new 
TestingJobMasterGatewayBuilder().build());
-        private CompletableFuture<JobManagerRunnerResult> 
jobManagerRunnerResultFuture =
-                new CompletableFuture<>();
-        private CompletableFuture<String> leaderAddressFuture =
-                CompletableFuture.completedFuture("foobar");
-        private boolean isInitialized = true;
-        @Nullable private CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
-        private boolean manualTerminationFutureCompletion = false;
-
-        public Builder setJobMasterGatewayFuture(
-                CompletableFuture<JobMasterGateway> jobMasterGatewayFuture) {
-            this.jobMasterGatewayFuture = jobMasterGatewayFuture;
-            return this;
+        private Supplier<CompletableFuture<Void>> closeAsyncSupplier = 
unsupportedOperation();
+        private Supplier<Boolean> isInitializedAndRunningSupplier = 
unsupportedOperation();
+        private Supplier<CompletableFuture<JobMasterGateway>> 
getJobMasterGatewayFutureSupplier =
+                () ->
+                        CompletableFuture.completedFuture(
+                                new TestingJobMasterGatewayBuilder().build());
+        private Supplier<CompletableFuture<JobManagerRunnerResult>> 
getResultFutureSupplier =
+                CompletableFuture::new;
+        private Supplier<CompletableFuture<String>> 
getLeaderAddressFutureSupplier =
+                () -> CompletableFuture.completedFuture("leader address");
+
+        private static <T> Supplier<T> unsupportedOperation() {
+            return () -> {
+                throw new UnsupportedOperationException();
+            };
         }
 
-        public Builder setJobManagerRunnerResultFuture(
-                CompletableFuture<JobManagerRunnerResult> 
jobManagerRunnerResultFuture) {
-            this.jobManagerRunnerResultFuture = jobManagerRunnerResultFuture;
+        public Builder setCloseAsyncSupplier(Supplier<CompletableFuture<Void>> 
closeAsyncSupplier) {
+            this.closeAsyncSupplier = closeAsyncSupplier;
             return this;
         }
 
-        public Builder setLeaderAddressFuture(CompletableFuture<String> 
leaderAddressFuture) {
-            this.leaderAddressFuture = leaderAddressFuture;
+        public Builder setIsInitializedAndRunningSupplier(
+                Supplier<Boolean> isInitializedAndRunningSupplier) {
+            this.isInitializedAndRunningSupplier = 
isInitializedAndRunningSupplier;
             return this;
         }
 
-        public Builder setIsInitialized(boolean isInitialized) {
-            this.isInitialized = isInitialized;
+        public Builder setGetJobMasterGatewayFutureSupplier(
+                Supplier<CompletableFuture<JobMasterGateway>> 
getJobMasterGatewayFutureSupplier) {
+            this.getJobMasterGatewayFutureSupplier = 
getJobMasterGatewayFutureSupplier;
             return this;
         }
 
-        public Builder setTerminationFuture(@Nullable CompletableFuture<Void> 
terminationFuture) {
-            this.terminationFuture = terminationFuture;
+        public Builder setGetResultFutureSupplier(
+                Supplier<CompletableFuture<JobManagerRunnerResult>> 
getResultFutureSupplier) {
+            this.getResultFutureSupplier = getResultFutureSupplier;
             return this;
         }
 
-        public Builder withManualTerminationFutureCompletion() {
-            this.manualTerminationFutureCompletion = true;
+        public Builder setGetLeaderAddressFutureSupplier(
+                Supplier<CompletableFuture<String>> 
getLeaderAddressFutureSupplier) {
+            this.getLeaderAddressFutureSupplier = 
getLeaderAddressFutureSupplier;
             return this;
         }
 
         public TestingJobMasterServiceProcess build() {
             return new TestingJobMasterServiceProcess(
-                    jobMasterGatewayFuture,
-                    jobManagerRunnerResultFuture,
-                    leaderAddressFuture,
-                    isInitialized,
-                    terminationFuture,
-                    manualTerminationFutureCompletion);
+                    closeAsyncSupplier,
+                    isInitializedAndRunningSupplier,
+                    getJobMasterGatewayFutureSupplier,
+                    getResultFutureSupplier,
+                    getLeaderAddressFutureSupplier);
         }
     }
 }


Reply via email to