XComp commented on a change in pull request #18650:
URL: https://github.com/apache/flink/pull/18650#discussion_r801459535



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
  * A variant of the HighAvailabilityServices for testing. Each individual 
service can be set to an
  * arbitrary implementation, such as a mock or default service.
  */
-public class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
-
-    private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
-    private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
-    private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
+
+    public static EmptyBuilder newBuilder() {
+        return new EmptyBuilder();
+    }
+
+    public static EmptyBuilder newStandaloneBuilder() {
+        return new EmptyBuilder()
+                .setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory())
+                .setJobGraphStore(new StandaloneJobGraphStore())
+                .setDispatcherLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setResourceManagerLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setClusterRestEndpointLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setDispatcherLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setResourceManagerLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setClusterRestEndpointLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setJobMasterLeaderRetrieverFunction(
+                        jobId ->
+                                new StandaloneLeaderRetrievalService(
+                                        "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setJobMasterLeaderElectionServiceFunction(
+                        jobId -> new StandaloneLeaderElectionService());
+    }
+
+    public static EmbeddedBuilder newEmbeddedBuilder(Executor executor) {
+        return new EmbeddedBuilder(executor);
+    }
+
+    public abstract static class Builder<T extends Builder<T>> {
+
+        protected @Nullable CheckpointRecoveryFactory 
checkpointRecoveryFactory;
+        protected @Nullable JobGraphStore jobGraphStore;
+        protected JobResultStore jobResultStore = new EmbeddedJobResultStore();
+        protected CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
+        protected CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        protected CompletableFuture<JobID> globalCleanupFuture = new 
CompletableFuture<>();
+
+        private Builder() {}
+
+        public T setCheckpointRecoveryFactory(CheckpointRecoveryFactory 
checkpointRecoveryFactory) {
+            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction =
-            ignored -> null;
+        public T setJobGraphStore(JobGraphStore jobGraphStore) {
+            this.jobGraphStore = jobGraphStore;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction =
-            ignored -> null;
+        public T setJobResultStore(JobResultStore jobResultStore) {
+            this.jobResultStore = jobResultStore;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private ConcurrentHashMap<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrievers =
-            new ConcurrentHashMap<>();
+        public T setCloseFuture(CompletableFuture<Void> closeFuture) {
+            this.closeFuture = closeFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private ConcurrentHashMap<JobID, LeaderElectionService> 
jobManagerLeaderElectionServices =
-            new ConcurrentHashMap<>();
+        public T setCloseAndCleanupAllDataFuture(
+                CompletableFuture<Void> closeAndCleanupAllDataFuture) {
+            this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile LeaderElectionService 
resourceManagerLeaderElectionService;
+        public T setGlobalCleanupFuture(CompletableFuture<JobID> 
globalCleanupFuture) {
+            this.globalCleanupFuture = globalCleanupFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile LeaderElectionService dispatcherLeaderElectionService;
+        public abstract TestingHighAvailabilityServices build();
+    }
 
-    private volatile LeaderElectionService 
clusterRestEndpointLeaderElectionService;
+    public static class EmbeddedBuilder extends Builder<EmbeddedBuilder> {
 
-    private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
+        private final Executor executor;
 
-    private volatile JobGraphStore jobGraphStore;
+        private EmbeddedBuilder(Executor executor) {
+            this.executor = executor;
+        }
 
-    private volatile JobResultStore jobResultStore = new 
EmbeddedJobResultStore();
+        @Override
+        public TestingHighAvailabilityServices build() {
+            try {
+                return new EmbeddedLeaderElection(
+                        checkpointRecoveryFactory,
+                        jobGraphStore,
+                        jobResultStore,
+                        closeFuture,
+                        closeAndCleanupAllDataFuture,
+                        globalCleanupFuture,
+                        new EmbeddedHaServicesWithLeadershipControl(executor));
+            } catch (Exception e) {
+                throw new IllegalStateException("Error building embedded 
services.", e);
+            }
+        }
+    }
 
-    private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+    public static class EmptyBuilder extends Builder<EmptyBuilder> {
+
+        private LeaderRetrievalService dispatcherLeaderRetriever;
+        private LeaderRetrievalService resourceManagerLeaderRetriever;
+        private LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+
+        private LeaderElectionService resourceManagerLeaderElectionService;
+        private LeaderElectionService dispatcherLeaderElectionService;
+        private LeaderElectionService clusterRestEndpointLeaderElectionService;
+
+        private Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction =
+                ignored -> null;
+        private Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction =
+                ignored -> null;
+
+        private EmptyBuilder() {}
+
+        public TestingHighAvailabilityServices build() {
+            return new ManualLeaderElection(
+                    checkpointRecoveryFactory,
+                    jobGraphStore,
+                    jobResultStore,
+                    closeFuture,
+                    closeAndCleanupAllDataFuture,
+                    globalCleanupFuture,
+                    dispatcherLeaderRetriever,
+                    resourceManagerLeaderRetriever,
+                    clusterRestEndpointLeaderRetriever,
+                    dispatcherLeaderElectionService,
+                    resourceManagerLeaderElectionService,
+                    clusterRestEndpointLeaderElectionService,
+                    jobMasterLeaderRetrieverFunction,
+                    jobMasterLeaderElectionServiceFunction);
+        }
 
-    private CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        public EmptyBuilder setResourceManagerLeaderRetriever(
+                LeaderRetrievalService resourceManagerLeaderRetriever) {
+            this.resourceManagerLeaderRetriever = 
resourceManagerLeaderRetriever;
+            return this;
+        }
 
-    private volatile CompletableFuture<JobID> globalCleanupFuture;
+        public EmptyBuilder setDispatcherLeaderRetriever(
+                LeaderRetrievalService dispatcherLeaderRetriever) {
+            this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+            return this;
+        }
 
-    // ------------------------------------------------------------------------
-    //  Setters for mock / testing implementations
-    // ------------------------------------------------------------------------
+        public EmptyBuilder setClusterRestEndpointLeaderRetriever(
+                final LeaderRetrievalService 
clusterRestEndpointLeaderRetriever) {
+            this.clusterRestEndpointLeaderRetriever = 
clusterRestEndpointLeaderRetriever;
+            return this;
+        }
 
-    public void setResourceManagerLeaderRetriever(
-            LeaderRetrievalService resourceManagerLeaderRetriever) {
-        this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
-    }
+        public EmptyBuilder setResourceManagerLeaderElectionService(
+                LeaderElectionService leaderElectionService) {
+            this.resourceManagerLeaderElectionService = leaderElectionService;
+            return this;
+        }
 
-    public void setDispatcherLeaderRetriever(LeaderRetrievalService 
dispatcherLeaderRetriever) {
-        this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
-    }
+        public EmptyBuilder setDispatcherLeaderElectionService(
+                LeaderElectionService leaderElectionService) {
+            this.dispatcherLeaderElectionService = leaderElectionService;
+            return this;
+        }
 
-    public void setClusterRestEndpointLeaderRetriever(
-            final LeaderRetrievalService clusterRestEndpointLeaderRetriever) {
-        this.clusterRestEndpointLeaderRetriever = 
clusterRestEndpointLeaderRetriever;
-    }
+        public EmptyBuilder setClusterRestEndpointLeaderElectionService(
+                final LeaderElectionService 
clusterRestEndpointLeaderElectionService) {
+            this.clusterRestEndpointLeaderElectionService =
+                    clusterRestEndpointLeaderElectionService;
+            return this;
+        }
 
-    public void setJobMasterLeaderRetriever(
-            JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever) {
-        this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
-    }
+        public EmptyBuilder setJobMasterLeaderElectionServiceFunction(
+                Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction) {
+            this.jobMasterLeaderElectionServiceFunction = 
jobMasterLeaderElectionServiceFunction;
+            return this;
+        }
 
-    public void setJobMasterLeaderElectionService(
-            JobID jobID, LeaderElectionService leaderElectionService) {
-        this.jobManagerLeaderElectionServices.put(jobID, 
leaderElectionService);
+        public EmptyBuilder setJobMasterLeaderRetrieverFunction(
+                Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction) {
+            this.jobMasterLeaderRetrieverFunction = 
jobMasterLeaderRetrieverFunction;
+            return this;
+        }
     }
 
-    public void setResourceManagerLeaderElectionService(
-            LeaderElectionService leaderElectionService) {
-        this.resourceManagerLeaderElectionService = leaderElectionService;
-    }
+    public static class EmbeddedLeaderElection extends 
TestingHighAvailabilityServices {

Review comment:
       The naming is confusing. Can we rename this into something like 
`TestingHighAvailabilityServicesWithManualLeaderElection` or 
`TestingHAServicesWithManualLeaderElection` or 
`ManualLeaderElectionHAServices`? I initially got confused when reading the 
code and seeing that the `EmbeddedBuilder` builds a "EmbeddedLeaderElecton"

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
  * A variant of the HighAvailabilityServices for testing. Each individual 
service can be set to an
  * arbitrary implementation, such as a mock or default service.
  */
-public class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
-
-    private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
-    private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
-    private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements 
HighAvailabilityServices {

Review comment:
       ~~Don't you think that it's worth it splitting this class up? There are 
so many implementations with various members that it's easy to lose track of 
which subclass implements what functionality.~~
   
   It looks like we could get rid of the `Embedded*` implementation considering 
that it's only used in the `ApplicationDispatcherBootstrapITCase` where it's 
not utilized as far as I can see... That would help making this code less 
complex.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
  * A variant of the HighAvailabilityServices for testing. Each individual 
service can be set to an
  * arbitrary implementation, such as a mock or default service.
  */
-public class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
-
-    private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
-    private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
-    private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
+
+    public static EmptyBuilder newBuilder() {
+        return new EmptyBuilder();
+    }
+
+    public static EmptyBuilder newStandaloneBuilder() {
+        return new EmptyBuilder()
+                .setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory())
+                .setJobGraphStore(new StandaloneJobGraphStore())
+                .setDispatcherLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setResourceManagerLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setClusterRestEndpointLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setDispatcherLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setResourceManagerLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setClusterRestEndpointLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setJobMasterLeaderRetrieverFunction(
+                        jobId ->
+                                new StandaloneLeaderRetrievalService(
+                                        "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setJobMasterLeaderElectionServiceFunction(
+                        jobId -> new StandaloneLeaderElectionService());
+    }
+
+    public static EmbeddedBuilder newEmbeddedBuilder(Executor executor) {
+        return new EmbeddedBuilder(executor);
+    }
+
+    public abstract static class Builder<T extends Builder<T>> {
+
+        protected @Nullable CheckpointRecoveryFactory 
checkpointRecoveryFactory;
+        protected @Nullable JobGraphStore jobGraphStore;
+        protected JobResultStore jobResultStore = new EmbeddedJobResultStore();
+        protected CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
+        protected CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        protected CompletableFuture<JobID> globalCleanupFuture = new 
CompletableFuture<>();
+
+        private Builder() {}
+
+        public T setCheckpointRecoveryFactory(CheckpointRecoveryFactory 
checkpointRecoveryFactory) {
+            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction =
-            ignored -> null;
+        public T setJobGraphStore(JobGraphStore jobGraphStore) {
+            this.jobGraphStore = jobGraphStore;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction =
-            ignored -> null;
+        public T setJobResultStore(JobResultStore jobResultStore) {
+            this.jobResultStore = jobResultStore;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private ConcurrentHashMap<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrievers =
-            new ConcurrentHashMap<>();
+        public T setCloseFuture(CompletableFuture<Void> closeFuture) {
+            this.closeFuture = closeFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private ConcurrentHashMap<JobID, LeaderElectionService> 
jobManagerLeaderElectionServices =
-            new ConcurrentHashMap<>();
+        public T setCloseAndCleanupAllDataFuture(
+                CompletableFuture<Void> closeAndCleanupAllDataFuture) {
+            this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile LeaderElectionService 
resourceManagerLeaderElectionService;
+        public T setGlobalCleanupFuture(CompletableFuture<JobID> 
globalCleanupFuture) {
+            this.globalCleanupFuture = globalCleanupFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile LeaderElectionService dispatcherLeaderElectionService;
+        public abstract TestingHighAvailabilityServices build();
+    }
 
-    private volatile LeaderElectionService 
clusterRestEndpointLeaderElectionService;
+    public static class EmbeddedBuilder extends Builder<EmbeddedBuilder> {
 
-    private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
+        private final Executor executor;
 
-    private volatile JobGraphStore jobGraphStore;
+        private EmbeddedBuilder(Executor executor) {
+            this.executor = executor;
+        }
 
-    private volatile JobResultStore jobResultStore = new 
EmbeddedJobResultStore();
+        @Override
+        public TestingHighAvailabilityServices build() {
+            try {
+                return new EmbeddedLeaderElection(
+                        checkpointRecoveryFactory,
+                        jobGraphStore,
+                        jobResultStore,
+                        closeFuture,
+                        closeAndCleanupAllDataFuture,
+                        globalCleanupFuture,
+                        new EmbeddedHaServicesWithLeadershipControl(executor));
+            } catch (Exception e) {
+                throw new IllegalStateException("Error building embedded 
services.", e);
+            }
+        }
+    }
 
-    private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+    public static class EmptyBuilder extends Builder<EmptyBuilder> {
+
+        private LeaderRetrievalService dispatcherLeaderRetriever;
+        private LeaderRetrievalService resourceManagerLeaderRetriever;
+        private LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+
+        private LeaderElectionService resourceManagerLeaderElectionService;
+        private LeaderElectionService dispatcherLeaderElectionService;
+        private LeaderElectionService clusterRestEndpointLeaderElectionService;
+
+        private Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction =
+                ignored -> null;
+        private Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction =
+                ignored -> null;
+
+        private EmptyBuilder() {}
+
+        public TestingHighAvailabilityServices build() {
+            return new ManualLeaderElection(
+                    checkpointRecoveryFactory,
+                    jobGraphStore,
+                    jobResultStore,
+                    closeFuture,
+                    closeAndCleanupAllDataFuture,
+                    globalCleanupFuture,
+                    dispatcherLeaderRetriever,
+                    resourceManagerLeaderRetriever,
+                    clusterRestEndpointLeaderRetriever,
+                    dispatcherLeaderElectionService,
+                    resourceManagerLeaderElectionService,
+                    clusterRestEndpointLeaderElectionService,
+                    jobMasterLeaderRetrieverFunction,
+                    jobMasterLeaderElectionServiceFunction);
+        }
 
-    private CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        public EmptyBuilder setResourceManagerLeaderRetriever(
+                LeaderRetrievalService resourceManagerLeaderRetriever) {
+            this.resourceManagerLeaderRetriever = 
resourceManagerLeaderRetriever;
+            return this;
+        }
 
-    private volatile CompletableFuture<JobID> globalCleanupFuture;
+        public EmptyBuilder setDispatcherLeaderRetriever(
+                LeaderRetrievalService dispatcherLeaderRetriever) {
+            this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+            return this;
+        }
 
-    // ------------------------------------------------------------------------
-    //  Setters for mock / testing implementations
-    // ------------------------------------------------------------------------
+        public EmptyBuilder setClusterRestEndpointLeaderRetriever(
+                final LeaderRetrievalService 
clusterRestEndpointLeaderRetriever) {
+            this.clusterRestEndpointLeaderRetriever = 
clusterRestEndpointLeaderRetriever;
+            return this;
+        }
 
-    public void setResourceManagerLeaderRetriever(
-            LeaderRetrievalService resourceManagerLeaderRetriever) {
-        this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
-    }
+        public EmptyBuilder setResourceManagerLeaderElectionService(
+                LeaderElectionService leaderElectionService) {
+            this.resourceManagerLeaderElectionService = leaderElectionService;
+            return this;
+        }
 
-    public void setDispatcherLeaderRetriever(LeaderRetrievalService 
dispatcherLeaderRetriever) {
-        this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
-    }
+        public EmptyBuilder setDispatcherLeaderElectionService(
+                LeaderElectionService leaderElectionService) {
+            this.dispatcherLeaderElectionService = leaderElectionService;
+            return this;
+        }
 
-    public void setClusterRestEndpointLeaderRetriever(
-            final LeaderRetrievalService clusterRestEndpointLeaderRetriever) {
-        this.clusterRestEndpointLeaderRetriever = 
clusterRestEndpointLeaderRetriever;
-    }
+        public EmptyBuilder setClusterRestEndpointLeaderElectionService(
+                final LeaderElectionService 
clusterRestEndpointLeaderElectionService) {
+            this.clusterRestEndpointLeaderElectionService =
+                    clusterRestEndpointLeaderElectionService;
+            return this;
+        }
 
-    public void setJobMasterLeaderRetriever(
-            JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever) {
-        this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
-    }
+        public EmptyBuilder setJobMasterLeaderElectionServiceFunction(
+                Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction) {
+            this.jobMasterLeaderElectionServiceFunction = 
jobMasterLeaderElectionServiceFunction;
+            return this;
+        }
 
-    public void setJobMasterLeaderElectionService(
-            JobID jobID, LeaderElectionService leaderElectionService) {
-        this.jobManagerLeaderElectionServices.put(jobID, 
leaderElectionService);
+        public EmptyBuilder setJobMasterLeaderRetrieverFunction(
+                Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction) {
+            this.jobMasterLeaderRetrieverFunction = 
jobMasterLeaderRetrieverFunction;
+            return this;
+        }
     }
 
-    public void setResourceManagerLeaderElectionService(
-            LeaderElectionService leaderElectionService) {
-        this.resourceManagerLeaderElectionService = leaderElectionService;
-    }
+    public static class EmbeddedLeaderElection extends 
TestingHighAvailabilityServices {
+
+        private final EmbeddedHaServicesWithLeadershipControl embeddedServices;
+
+        private EmbeddedLeaderElection(
+                @Nullable CheckpointRecoveryFactory checkpointRecoveryFactory,
+                @Nullable JobGraphStore jobGraphStore,
+                JobResultStore jobResultStore,
+                CompletableFuture<Void> closeFuture,
+                CompletableFuture<Void> closeAndCleanupAllDataFuture,
+                CompletableFuture<JobID> globalCleanupFuture,
+                EmbeddedHaServicesWithLeadershipControl embeddedServices)
+                throws Exception {
+            super(
+                    MoreObjects.firstNonNull(
+                            checkpointRecoveryFactory,
+                            embeddedServices.getCheckpointRecoveryFactory()),
+                    MoreObjects.firstNonNull(jobGraphStore, 
embeddedServices.getJobGraphStore()),
+                    jobResultStore,
+                    closeFuture,
+                    closeAndCleanupAllDataFuture,
+                    globalCleanupFuture);
+            this.embeddedServices = embeddedServices;
+        }
 
-    public void setDispatcherLeaderElectionService(LeaderElectionService 
leaderElectionService) {
-        this.dispatcherLeaderElectionService = leaderElectionService;
-    }
+        @Override
+        public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+            return embeddedServices.getResourceManagerLeaderRetriever();
+        }
 
-    public void setClusterRestEndpointLeaderElectionService(
-            final LeaderElectionService 
clusterRestEndpointLeaderElectionService) {
-        this.clusterRestEndpointLeaderElectionService = 
clusterRestEndpointLeaderElectionService;
-    }
+        @Override
+        public LeaderRetrievalService getDispatcherLeaderRetriever() {
+            return embeddedServices.getDispatcherLeaderRetriever();
+        }
 
-    public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory 
checkpointRecoveryFactory) {
-        this.checkpointRecoveryFactory = checkpointRecoveryFactory;
-    }
+        @Override
+        public LeaderRetrievalService getJobManagerLeaderRetriever(JobID 
jobId) {
+            return embeddedServices.getJobManagerLeaderRetriever(jobId);
+        }
 
-    public void setJobGraphStore(JobGraphStore jobGraphStore) {
-        this.jobGraphStore = jobGraphStore;
-    }
+        @Override
+        public LeaderRetrievalService getJobManagerLeaderRetriever(
+                JobID jobId, String defaultJobManagerAddress) {
+            return embeddedServices.getJobManagerLeaderRetriever(jobId, 
defaultJobManagerAddress);
+        }
 
-    public void setJobResultStore(JobResultStore jobResultStore) {
-        this.jobResultStore = jobResultStore;
-    }
+        @Override
+        public LeaderElectionService getResourceManagerLeaderElectionService() 
{
+            return embeddedServices.getResourceManagerLeaderElectionService();
+        }
 
-    public void setJobMasterLeaderElectionServiceFunction(
-            Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction) {
-        this.jobMasterLeaderElectionServiceFunction = 
jobMasterLeaderElectionServiceFunction;
-    }
+        @Override
+        public LeaderElectionService getDispatcherLeaderElectionService() {
+            return embeddedServices.getResourceManagerLeaderElectionService();
+        }
 
-    public void setJobMasterLeaderRetrieverFunction(
-            Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction) {
-        this.jobMasterLeaderRetrieverFunction = 
jobMasterLeaderRetrieverFunction;
-    }
+        @Override
+        public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
+            return embeddedServices.getJobManagerLeaderElectionService(jobID);
+        }
 
-    public void setCloseFuture(CompletableFuture<Void> closeFuture) {
-        this.closeFuture = closeFuture;
-    }
+        @Override
+        public LeaderElectionService 
getClusterRestEndpointLeaderElectionService() {
+            return 
embeddedServices.getClusterRestEndpointLeaderElectionService();
+        }
 
-    public void setCloseAndCleanupAllDataFuture(
-            CompletableFuture<Void> closeAndCleanupAllDataFuture) {
-        this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
-    }
+        @Override
+        public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
+            return embeddedServices.getClusterRestEndpointLeaderRetriever();
+        }
 
-    public void setGlobalCleanupFuture(CompletableFuture<JobID> 
globalCleanupFuture) {
-        this.globalCleanupFuture = globalCleanupFuture;
+        @Override
+        public HaLeadershipControl getLeadershipControl() {
+            return embeddedServices;
+        }
     }
 
-    // ------------------------------------------------------------------------
-    //  HA Services Methods
-    // ------------------------------------------------------------------------
+    public static class ManualLeaderElection extends 
TestingHighAvailabilityServices {

Review comment:
       Same as what I already suggested for the `EmbeddedLeaderElection`: Can 
we rename it to something indicating that it's actually a HAServices 
implementation? It looks like that's because the old implementation implemented 
Builder functionality with setters as well. That means that we could move this 
check into the constructors

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
  * A variant of the HighAvailabilityServices for testing. Each individual 
service can be set to an
  * arbitrary implementation, such as a mock or default service.
  */
-public class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
-
-    private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
-    private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
-    private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
+
+    public static EmptyBuilder newBuilder() {

Review comment:
       Could we add some JavaDoc to the `new*Builder` methods to describe the 
use-cases in which each of these methods should be used?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
  * A variant of the HighAvailabilityServices for testing. Each individual 
service can be set to an
  * arbitrary implementation, such as a mock or default service.
  */
-public class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
-
-    private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
-    private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
-    private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
+
+    public static EmptyBuilder newBuilder() {
+        return new EmptyBuilder();
+    }
+
+    public static EmptyBuilder newStandaloneBuilder() {
+        return new EmptyBuilder()
+                .setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory())
+                .setJobGraphStore(new StandaloneJobGraphStore())
+                .setDispatcherLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setResourceManagerLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setClusterRestEndpointLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setDispatcherLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setResourceManagerLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setClusterRestEndpointLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setJobMasterLeaderRetrieverFunction(
+                        jobId ->
+                                new StandaloneLeaderRetrievalService(
+                                        "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setJobMasterLeaderElectionServiceFunction(
+                        jobId -> new StandaloneLeaderElectionService());
+    }
+
+    public static EmbeddedBuilder newEmbeddedBuilder(Executor executor) {
+        return new EmbeddedBuilder(executor);
+    }
+
+    public abstract static class Builder<T extends Builder<T>> {
+
+        protected @Nullable CheckpointRecoveryFactory 
checkpointRecoveryFactory;
+        protected @Nullable JobGraphStore jobGraphStore;
+        protected JobResultStore jobResultStore = new EmbeddedJobResultStore();
+        protected CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
+        protected CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        protected CompletableFuture<JobID> globalCleanupFuture = new 
CompletableFuture<>();
+
+        private Builder() {}
+
+        public T setCheckpointRecoveryFactory(CheckpointRecoveryFactory 
checkpointRecoveryFactory) {
+            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction =
-            ignored -> null;
+        public T setJobGraphStore(JobGraphStore jobGraphStore) {
+            this.jobGraphStore = jobGraphStore;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction =
-            ignored -> null;
+        public T setJobResultStore(JobResultStore jobResultStore) {
+            this.jobResultStore = jobResultStore;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private ConcurrentHashMap<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrievers =
-            new ConcurrentHashMap<>();
+        public T setCloseFuture(CompletableFuture<Void> closeFuture) {
+            this.closeFuture = closeFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private ConcurrentHashMap<JobID, LeaderElectionService> 
jobManagerLeaderElectionServices =
-            new ConcurrentHashMap<>();
+        public T setCloseAndCleanupAllDataFuture(
+                CompletableFuture<Void> closeAndCleanupAllDataFuture) {
+            this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile LeaderElectionService 
resourceManagerLeaderElectionService;
+        public T setGlobalCleanupFuture(CompletableFuture<JobID> 
globalCleanupFuture) {
+            this.globalCleanupFuture = globalCleanupFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile LeaderElectionService dispatcherLeaderElectionService;
+        public abstract TestingHighAvailabilityServices build();
+    }
 
-    private volatile LeaderElectionService 
clusterRestEndpointLeaderElectionService;
+    public static class EmbeddedBuilder extends Builder<EmbeddedBuilder> {
 
-    private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
+        private final Executor executor;
 
-    private volatile JobGraphStore jobGraphStore;
+        private EmbeddedBuilder(Executor executor) {
+            this.executor = executor;
+        }
 
-    private volatile JobResultStore jobResultStore = new 
EmbeddedJobResultStore();
+        @Override
+        public TestingHighAvailabilityServices build() {
+            try {
+                return new EmbeddedLeaderElection(
+                        checkpointRecoveryFactory,
+                        jobGraphStore,
+                        jobResultStore,
+                        closeFuture,
+                        closeAndCleanupAllDataFuture,
+                        globalCleanupFuture,
+                        new EmbeddedHaServicesWithLeadershipControl(executor));
+            } catch (Exception e) {
+                throw new IllegalStateException("Error building embedded 
services.", e);
+            }
+        }
+    }
 
-    private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+    public static class EmptyBuilder extends Builder<EmptyBuilder> {

Review comment:
       `BuilderWithLeadershipServices` is a more descriptive name, here. WDYT? 
🤔 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -162,7 +165,11 @@ public void setUp() throws Exception {
         jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
         jobId = jobGraph.getJobID();
         jobMasterLeaderElectionService = new TestingLeaderElectionService();
-        haServices.setJobMasterLeaderElectionService(jobId, 
jobMasterLeaderElectionService);
+        haServicesBuilder.setJobMasterLeaderElectionServiceFunction(
+                jobId -> {
+                    assertEquals(jobGraph.getJobID(), jobId);

Review comment:
       nit: renaming it to `actualJobId` makes it easier to identify which one 
is the expected one

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
  * A variant of the HighAvailabilityServices for testing. Each individual 
service can be set to an
  * arbitrary implementation, such as a mock or default service.
  */
-public class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
-
-    private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
-    private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
-    private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
+
+    public static EmptyBuilder newBuilder() {
+        return new EmptyBuilder();
+    }
+
+    public static EmptyBuilder newStandaloneBuilder() {
+        return new EmptyBuilder()
+                .setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory())
+                .setJobGraphStore(new StandaloneJobGraphStore())
+                .setDispatcherLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setResourceManagerLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setClusterRestEndpointLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setDispatcherLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setResourceManagerLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setClusterRestEndpointLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setJobMasterLeaderRetrieverFunction(
+                        jobId ->
+                                new StandaloneLeaderRetrievalService(
+                                        "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setJobMasterLeaderElectionServiceFunction(
+                        jobId -> new StandaloneLeaderElectionService());
+    }
+
+    public static EmbeddedBuilder newEmbeddedBuilder(Executor executor) {
+        return new EmbeddedBuilder(executor);
+    }
+
+    public abstract static class Builder<T extends Builder<T>> {
+
+        protected @Nullable CheckpointRecoveryFactory 
checkpointRecoveryFactory;
+        protected @Nullable JobGraphStore jobGraphStore;
+        protected JobResultStore jobResultStore = new EmbeddedJobResultStore();

Review comment:
       I think we shouldn't initialize the `JobResultStore` here with a default 
value. We should have dedicated locations for the initialization of all these 
components. Either it's in the `Builder` implementation (like we do it right 
now for the `JobResultStore` or in the `newStandaloneBuilder()` factory method 
(like we do it for the `CheckpointRecoveryFactory` and the `JobGraphStore` (I'd 
prefer that one). That leaves the reader with a single location to check for 
default behavior.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -74,7 +75,7 @@
     @Before
     public void setUp() throws Exception {
         super.setUp();
-        haServices.setCheckpointRecoveryFactory(
+        haServicesBuilder.setCheckpointRecoveryFactory(
                 new 
PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>(
                         (maxCheckpoints, previous, sharedStateRegistryFactory, 
ioExecutor) -> {
                             if (previous != null) {

Review comment:
       I guess, that function can be refactored similar to the one in 
`EmbeddedHaServicesWithLeadershipControl` 😇 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
  * A variant of the HighAvailabilityServices for testing. Each individual 
service can be set to an
  * arbitrary implementation, such as a mock or default service.
  */
-public class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
-
-    private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
-    private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
-    private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
+
+    public static EmptyBuilder newBuilder() {
+        return new EmptyBuilder();
+    }
+
+    public static EmptyBuilder newStandaloneBuilder() {
+        return new EmptyBuilder()
+                .setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory())
+                .setJobGraphStore(new StandaloneJobGraphStore())
+                .setDispatcherLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setResourceManagerLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setClusterRestEndpointLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setDispatcherLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setResourceManagerLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setClusterRestEndpointLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setJobMasterLeaderRetrieverFunction(
+                        jobId ->
+                                new StandaloneLeaderRetrievalService(
+                                        "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setJobMasterLeaderElectionServiceFunction(
+                        jobId -> new StandaloneLeaderElectionService());
+    }
+
+    public static EmbeddedBuilder newEmbeddedBuilder(Executor executor) {
+        return new EmbeddedBuilder(executor);
+    }
+
+    public abstract static class Builder<T extends Builder<T>> {
+
+        protected @Nullable CheckpointRecoveryFactory 
checkpointRecoveryFactory;
+        protected @Nullable JobGraphStore jobGraphStore;
+        protected JobResultStore jobResultStore = new EmbeddedJobResultStore();
+        protected CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
+        protected CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        protected CompletableFuture<JobID> globalCleanupFuture = new 
CompletableFuture<>();
+
+        private Builder() {}
+
+        public T setCheckpointRecoveryFactory(CheckpointRecoveryFactory 
checkpointRecoveryFactory) {
+            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction =
-            ignored -> null;
+        public T setJobGraphStore(JobGraphStore jobGraphStore) {
+            this.jobGraphStore = jobGraphStore;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction =
-            ignored -> null;
+        public T setJobResultStore(JobResultStore jobResultStore) {
+            this.jobResultStore = jobResultStore;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private ConcurrentHashMap<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrievers =
-            new ConcurrentHashMap<>();
+        public T setCloseFuture(CompletableFuture<Void> closeFuture) {
+            this.closeFuture = closeFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private ConcurrentHashMap<JobID, LeaderElectionService> 
jobManagerLeaderElectionServices =
-            new ConcurrentHashMap<>();
+        public T setCloseAndCleanupAllDataFuture(
+                CompletableFuture<Void> closeAndCleanupAllDataFuture) {
+            this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile LeaderElectionService 
resourceManagerLeaderElectionService;
+        public T setGlobalCleanupFuture(CompletableFuture<JobID> 
globalCleanupFuture) {
+            this.globalCleanupFuture = globalCleanupFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile LeaderElectionService dispatcherLeaderElectionService;
+        public abstract TestingHighAvailabilityServices build();
+    }
 
-    private volatile LeaderElectionService 
clusterRestEndpointLeaderElectionService;
+    public static class EmbeddedBuilder extends Builder<EmbeddedBuilder> {
 
-    private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
+        private final Executor executor;
 
-    private volatile JobGraphStore jobGraphStore;
+        private EmbeddedBuilder(Executor executor) {
+            this.executor = executor;
+        }
 
-    private volatile JobResultStore jobResultStore = new 
EmbeddedJobResultStore();
+        @Override
+        public TestingHighAvailabilityServices build() {
+            try {
+                return new EmbeddedLeaderElection(
+                        checkpointRecoveryFactory,
+                        jobGraphStore,
+                        jobResultStore,
+                        closeFuture,
+                        closeAndCleanupAllDataFuture,
+                        globalCleanupFuture,
+                        new EmbeddedHaServicesWithLeadershipControl(executor));
+            } catch (Exception e) {
+                throw new IllegalStateException("Error building embedded 
services.", e);
+            }
+        }
+    }
 
-    private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+    public static class EmptyBuilder extends Builder<EmptyBuilder> {
+
+        private LeaderRetrievalService dispatcherLeaderRetriever;
+        private LeaderRetrievalService resourceManagerLeaderRetriever;
+        private LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+
+        private LeaderElectionService resourceManagerLeaderElectionService;
+        private LeaderElectionService dispatcherLeaderElectionService;
+        private LeaderElectionService clusterRestEndpointLeaderElectionService;
+
+        private Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction =
+                ignored -> null;
+        private Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction =
+                ignored -> null;
+
+        private EmptyBuilder() {}
+
+        public TestingHighAvailabilityServices build() {
+            return new ManualLeaderElection(
+                    checkpointRecoveryFactory,
+                    jobGraphStore,
+                    jobResultStore,
+                    closeFuture,
+                    closeAndCleanupAllDataFuture,
+                    globalCleanupFuture,
+                    dispatcherLeaderRetriever,
+                    resourceManagerLeaderRetriever,
+                    clusterRestEndpointLeaderRetriever,
+                    dispatcherLeaderElectionService,
+                    resourceManagerLeaderElectionService,
+                    clusterRestEndpointLeaderElectionService,
+                    jobMasterLeaderRetrieverFunction,
+                    jobMasterLeaderElectionServiceFunction);
+        }
 
-    private CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        public EmptyBuilder setResourceManagerLeaderRetriever(
+                LeaderRetrievalService resourceManagerLeaderRetriever) {
+            this.resourceManagerLeaderRetriever = 
resourceManagerLeaderRetriever;
+            return this;
+        }
 
-    private volatile CompletableFuture<JobID> globalCleanupFuture;
+        public EmptyBuilder setDispatcherLeaderRetriever(
+                LeaderRetrievalService dispatcherLeaderRetriever) {
+            this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+            return this;
+        }
 
-    // ------------------------------------------------------------------------
-    //  Setters for mock / testing implementations
-    // ------------------------------------------------------------------------
+        public EmptyBuilder setClusterRestEndpointLeaderRetriever(
+                final LeaderRetrievalService 
clusterRestEndpointLeaderRetriever) {
+            this.clusterRestEndpointLeaderRetriever = 
clusterRestEndpointLeaderRetriever;
+            return this;
+        }
 
-    public void setResourceManagerLeaderRetriever(
-            LeaderRetrievalService resourceManagerLeaderRetriever) {
-        this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
-    }
+        public EmptyBuilder setResourceManagerLeaderElectionService(
+                LeaderElectionService leaderElectionService) {
+            this.resourceManagerLeaderElectionService = leaderElectionService;
+            return this;
+        }
 
-    public void setDispatcherLeaderRetriever(LeaderRetrievalService 
dispatcherLeaderRetriever) {
-        this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
-    }
+        public EmptyBuilder setDispatcherLeaderElectionService(
+                LeaderElectionService leaderElectionService) {
+            this.dispatcherLeaderElectionService = leaderElectionService;
+            return this;
+        }
 
-    public void setClusterRestEndpointLeaderRetriever(
-            final LeaderRetrievalService clusterRestEndpointLeaderRetriever) {
-        this.clusterRestEndpointLeaderRetriever = 
clusterRestEndpointLeaderRetriever;
-    }
+        public EmptyBuilder setClusterRestEndpointLeaderElectionService(
+                final LeaderElectionService 
clusterRestEndpointLeaderElectionService) {
+            this.clusterRestEndpointLeaderElectionService =
+                    clusterRestEndpointLeaderElectionService;
+            return this;
+        }
 
-    public void setJobMasterLeaderRetriever(
-            JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever) {
-        this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
-    }
+        public EmptyBuilder setJobMasterLeaderElectionServiceFunction(
+                Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction) {
+            this.jobMasterLeaderElectionServiceFunction = 
jobMasterLeaderElectionServiceFunction;
+            return this;
+        }
 
-    public void setJobMasterLeaderElectionService(
-            JobID jobID, LeaderElectionService leaderElectionService) {
-        this.jobManagerLeaderElectionServices.put(jobID, 
leaderElectionService);
+        public EmptyBuilder setJobMasterLeaderRetrieverFunction(
+                Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction) {
+            this.jobMasterLeaderRetrieverFunction = 
jobMasterLeaderRetrieverFunction;
+            return this;
+        }
     }
 
-    public void setResourceManagerLeaderElectionService(
-            LeaderElectionService leaderElectionService) {
-        this.resourceManagerLeaderElectionService = leaderElectionService;
-    }
+    public static class EmbeddedLeaderElection extends 
TestingHighAvailabilityServices {
+
+        private final EmbeddedHaServicesWithLeadershipControl embeddedServices;
+
+        private EmbeddedLeaderElection(
+                @Nullable CheckpointRecoveryFactory checkpointRecoveryFactory,
+                @Nullable JobGraphStore jobGraphStore,
+                JobResultStore jobResultStore,
+                CompletableFuture<Void> closeFuture,
+                CompletableFuture<Void> closeAndCleanupAllDataFuture,
+                CompletableFuture<JobID> globalCleanupFuture,
+                EmbeddedHaServicesWithLeadershipControl embeddedServices)
+                throws Exception {
+            super(
+                    MoreObjects.firstNonNull(
+                            checkpointRecoveryFactory,
+                            embeddedServices.getCheckpointRecoveryFactory()),
+                    MoreObjects.firstNonNull(jobGraphStore, 
embeddedServices.getJobGraphStore()),
+                    jobResultStore,
+                    closeFuture,
+                    closeAndCleanupAllDataFuture,
+                    globalCleanupFuture);
+            this.embeddedServices = embeddedServices;
+        }
 
-    public void setDispatcherLeaderElectionService(LeaderElectionService 
leaderElectionService) {
-        this.dispatcherLeaderElectionService = leaderElectionService;
-    }
+        @Override
+        public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+            return embeddedServices.getResourceManagerLeaderRetriever();
+        }
 
-    public void setClusterRestEndpointLeaderElectionService(
-            final LeaderElectionService 
clusterRestEndpointLeaderElectionService) {
-        this.clusterRestEndpointLeaderElectionService = 
clusterRestEndpointLeaderElectionService;
-    }
+        @Override
+        public LeaderRetrievalService getDispatcherLeaderRetriever() {
+            return embeddedServices.getDispatcherLeaderRetriever();
+        }
 
-    public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory 
checkpointRecoveryFactory) {
-        this.checkpointRecoveryFactory = checkpointRecoveryFactory;
-    }
+        @Override
+        public LeaderRetrievalService getJobManagerLeaderRetriever(JobID 
jobId) {
+            return embeddedServices.getJobManagerLeaderRetriever(jobId);
+        }
 
-    public void setJobGraphStore(JobGraphStore jobGraphStore) {
-        this.jobGraphStore = jobGraphStore;
-    }
+        @Override
+        public LeaderRetrievalService getJobManagerLeaderRetriever(
+                JobID jobId, String defaultJobManagerAddress) {
+            return embeddedServices.getJobManagerLeaderRetriever(jobId, 
defaultJobManagerAddress);
+        }
 
-    public void setJobResultStore(JobResultStore jobResultStore) {
-        this.jobResultStore = jobResultStore;
-    }
+        @Override
+        public LeaderElectionService getResourceManagerLeaderElectionService() 
{
+            return embeddedServices.getResourceManagerLeaderElectionService();
+        }
 
-    public void setJobMasterLeaderElectionServiceFunction(
-            Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction) {
-        this.jobMasterLeaderElectionServiceFunction = 
jobMasterLeaderElectionServiceFunction;
-    }
+        @Override
+        public LeaderElectionService getDispatcherLeaderElectionService() {
+            return embeddedServices.getResourceManagerLeaderElectionService();
+        }
 
-    public void setJobMasterLeaderRetrieverFunction(
-            Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction) {
-        this.jobMasterLeaderRetrieverFunction = 
jobMasterLeaderRetrieverFunction;
-    }
+        @Override
+        public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
+            return embeddedServices.getJobManagerLeaderElectionService(jobID);
+        }
 
-    public void setCloseFuture(CompletableFuture<Void> closeFuture) {
-        this.closeFuture = closeFuture;
-    }
+        @Override
+        public LeaderElectionService 
getClusterRestEndpointLeaderElectionService() {
+            return 
embeddedServices.getClusterRestEndpointLeaderElectionService();
+        }
 
-    public void setCloseAndCleanupAllDataFuture(
-            CompletableFuture<Void> closeAndCleanupAllDataFuture) {
-        this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
-    }
+        @Override
+        public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
+            return embeddedServices.getClusterRestEndpointLeaderRetriever();
+        }
 
-    public void setGlobalCleanupFuture(CompletableFuture<JobID> 
globalCleanupFuture) {
-        this.globalCleanupFuture = globalCleanupFuture;
+        @Override
+        public HaLeadershipControl getLeadershipControl() {
+            return embeddedServices;
+        }
     }
 
-    // ------------------------------------------------------------------------
-    //  HA Services Methods
-    // ------------------------------------------------------------------------
+    public static class ManualLeaderElection extends 
TestingHighAvailabilityServices {
+
+        @Nullable private final LeaderRetrievalService 
dispatcherLeaderRetriever;
+        @Nullable private final LeaderRetrievalService 
resourceManagerLeaderRetriever;
+        @Nullable private final LeaderRetrievalService 
clusterRestEndpointLeaderRetriever;
+        @Nullable private final LeaderElectionService 
dispatcherLeaderElectionService;
+        @Nullable private final LeaderElectionService 
resourceManagerLeaderElectionService;
+        @Nullable private final LeaderElectionService 
clusterRestEndpointLeaderElectionService;
+
+        private final ConcurrentHashMap<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrievers =
+                new ConcurrentHashMap<>();
+        private final ConcurrentHashMap<JobID, LeaderElectionService>
+                jobManagerLeaderElectionServices = new ConcurrentHashMap<>();
+
+        private final Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction;
+        private final Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction;
+
+        private ManualLeaderElection(
+                @Nullable CheckpointRecoveryFactory checkpointRecoveryFactory,
+                @Nullable JobGraphStore jobGraphStore,
+                JobResultStore jobResultStore,
+                CompletableFuture<Void> closeFuture,
+                CompletableFuture<Void> closeAndCleanupAllDataFuture,
+                CompletableFuture<JobID> globalCleanupFuture,
+                @Nullable LeaderRetrievalService dispatcherLeaderRetriever,
+                @Nullable LeaderRetrievalService 
resourceManagerLeaderRetriever,
+                @Nullable LeaderRetrievalService 
clusterRestEndpointLeaderRetriever,
+                @Nullable LeaderElectionService 
dispatcherLeaderElectionService,
+                @Nullable LeaderElectionService 
resourceManagerLeaderElectionService,
+                @Nullable LeaderElectionService 
clusterRestEndpointLeaderElectionService,
+                Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction,
+                Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction) {
+            super(
+                    checkpointRecoveryFactory,
+                    jobGraphStore,
+                    jobResultStore,
+                    closeFuture,
+                    closeAndCleanupAllDataFuture,
+                    globalCleanupFuture);
+            this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+            this.resourceManagerLeaderRetriever = 
resourceManagerLeaderRetriever;
+            this.clusterRestEndpointLeaderRetriever = 
clusterRestEndpointLeaderRetriever;
+            this.dispatcherLeaderElectionService = 
dispatcherLeaderElectionService;
+            this.resourceManagerLeaderElectionService = 
resourceManagerLeaderElectionService;
+            this.clusterRestEndpointLeaderElectionService =
+                    clusterRestEndpointLeaderElectionService;
+            this.jobMasterLeaderRetrieverFunction = 
jobMasterLeaderRetrieverFunction;
+            this.jobMasterLeaderElectionServiceFunction = 
jobMasterLeaderElectionServiceFunction;
+        }
 
-    @Override
-    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
-        LeaderRetrievalService service = this.resourceManagerLeaderRetriever;
-        if (service != null) {
-            return service;
-        } else {
-            throw new IllegalStateException("ResourceManagerLeaderRetriever 
has not been set");
+        @Override
+        public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+            if (resourceManagerLeaderRetriever == null) {

Review comment:
       What's the purpose of all the null checks being done in the method 
implementations? Can't we do this in the constructor?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -291,7 +530,8 @@ public void closeAndCleanupAllData() throws Exception {
         if (globalCleanupFuture != null) {

Review comment:
       `globalCleanupFuture` is actually not nullable anymore

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
##########
@@ -182,14 +183,10 @@ public void testDirtyJobResultRecoveryInApplicationMode() 
throws Exception {
                 new JobResultEntry(
                         TestingJobResultStore.createSuccessfulJobResult(
                                 ApplicationDispatcherBootstrap.ZERO_JOB_ID)));
-        final EmbeddedHaServicesWithLeadershipControl haServices =
-                new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) {
-
-                    @Override
-                    public JobResultStore getJobResultStore() {
-                        return jobResultStore;
-                    }
-                };
+        final TestingHighAvailabilityServices haServices =

Review comment:
       It looks like we could replace it by the standalone version due to 
leader election functionality not being used... That's my bad - I guess I 
didn't check that code path properly when creating the test

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -132,12 +133,16 @@ public void 
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
                                 })
                         .build();
         jobGraphStore.start(null);
-        haServices.setJobGraphStore(jobGraphStore);
+        haServicesBuilder.setJobGraphStore(jobGraphStore);
 
         // Construct leader election service.
         final TestingLeaderElectionService leaderElectionService =
                 new TestingLeaderElectionService();
-        haServices.setJobMasterLeaderElectionService(jobId, 
leaderElectionService);
+        haServicesBuilder.setJobMasterLeaderElectionServiceFunction(
+                innerJobId -> {
+                    assertEquals(jobId, innerJobId);

Review comment:
       I'm not going to mention the other occurrences in the review...

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
  * A variant of the HighAvailabilityServices for testing. Each individual 
service can be set to an
  * arbitrary implementation, such as a mock or default service.
  */
-public class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
-
-    private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
-    private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
-    private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements 
HighAvailabilityServices {

Review comment:
       After going through the usages of each of these `Builder`s, it's 
essentially: `newBuilder` replaces `TestingHighAvailabilityServices`, 
`newStandaloneBuilder` replaces `TestingHighAvailabilityServicesBuilder` and 
`newEmbeddedBuilder` is a new implementation providing direct access to the 
leader election functionality which is only used in 
`ApplicationDispatcherBootstrapITCase` for now... 
   
   Considering that the change might be good enough but we should create a 
follow-up task refactoring the structure into a easier-to-read construct. 🤔 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -291,7 +530,8 @@ public void closeAndCleanupAllData() throws Exception {
         if (globalCleanupFuture != null) {
             globalCleanupFuture.complete(jobID);
         }
-
         return FutureUtils.completedVoidFuture();
     }
+
+    public abstract HaLeadershipControl getLeadershipControl();

Review comment:
       As far as I can see, this method is only properly used by the 
`EmbeddedLeaderElection` implementation. The `Builder` implementation could 
provide another Generic Type that defines the return type of the `build()` 
method. That's how we could make this method available only through the 
`EmbeddedLeaderElection` returned by the `EmbeddedBuilder`

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
##########
@@ -182,14 +183,10 @@ public void testDirtyJobResultRecoveryInApplicationMode() 
throws Exception {
                 new JobResultEntry(
                         TestingJobResultStore.createSuccessfulJobResult(
                                 ApplicationDispatcherBootstrap.ZERO_JOB_ID)));
-        final EmbeddedHaServicesWithLeadershipControl haServices =
-                new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) {
-
-                    @Override
-                    public JobResultStore getJobResultStore() {
-                        return jobResultStore;
-                    }
-                };
+        final TestingHighAvailabilityServices haServices =

Review comment:
       Do we actually need the `EmbeddedHaServicesWithLeadershipControl` here? 
Isn't it more about the `EmbeddedHaServicesWithLeadershipControl` in 
`EmbeddedHaServicesWithLeadershipControl` that is of value in this test? We 
don't use the leadership control at all in the test code... 🤔 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -132,12 +133,16 @@ public void 
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
                                 })
                         .build();
         jobGraphStore.start(null);
-        haServices.setJobGraphStore(jobGraphStore);
+        haServicesBuilder.setJobGraphStore(jobGraphStore);
 
         // Construct leader election service.
         final TestingLeaderElectionService leaderElectionService =
                 new TestingLeaderElectionService();
-        haServices.setJobMasterLeaderElectionService(jobId, 
leaderElectionService);
+        haServicesBuilder.setJobMasterLeaderElectionServiceFunction(
+                innerJobId -> {
+                    assertEquals(jobId, innerJobId);

Review comment:
       nit: Sticking to the test-related wording, the `innerJobId` could be 
named `actualJobId`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -817,12 +830,16 @@ public void testJobSuspensionWhenDispatcherIsTerminated() 
throws Exception {
     @Test
     public void testJobStatusIsShownDuringTermination() throws Exception {
         final JobID blockingId = new JobID();

Review comment:
       nit: Not sure whether we want to do that as part of this PR. But moving 
the `blockedJobGraph` creation into the top of this test would help us getting 
rid of `blockingId` and use `blockedJobGraph.getJobID()` instead. That would 
make it clearer where this jobID is coming from...

##########
File path: 
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
##########
@@ -182,14 +183,10 @@ public void testDirtyJobResultRecoveryInApplicationMode() 
throws Exception {
                 new JobResultEntry(
                         TestingJobResultStore.createSuccessfulJobResult(
                                 ApplicationDispatcherBootstrap.ZERO_JOB_ID)));
-        final EmbeddedHaServicesWithLeadershipControl haServices =
-                new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) {
-
-                    @Override
-                    public JobResultStore getJobResultStore() {
-                        return jobResultStore;
-                    }
-                };
+        final TestingHighAvailabilityServices haServices =

Review comment:
       Looks like it's the only occurrence of `EmbeddedLeaderElection`. So, we 
could remove it as well

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
  * A variant of the HighAvailabilityServices for testing. Each individual 
service can be set to an
  * arbitrary implementation, such as a mock or default service.
  */
-public class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
-
-    private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
-    private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
-    private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements 
HighAvailabilityServices {

Review comment:
       I'm wondering whether the complexity of the `Testing*` implementation 
comes from the fact that we're combining two interfaces into one. We have the 
`HAServices` like `JobGraphStore`, `JobResultStore` and 
`CheckpointRecoveryFactory` which provides the information for jobs to recover 
and the "`LeadershipServices`" that are responsible for leader election. Should 
we create a follow-up ticket for splitting this up to improve maintainability 
of the code?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
  * A variant of the HighAvailabilityServices for testing. Each individual 
service can be set to an
  * arbitrary implementation, such as a mock or default service.
  */
-public class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
-
-    private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
-    private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
-    private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements 
HighAvailabilityServices {
+
+    public static EmptyBuilder newBuilder() {
+        return new EmptyBuilder();
+    }
+
+    public static EmptyBuilder newStandaloneBuilder() {
+        return new EmptyBuilder()
+                .setCheckpointRecoveryFactory(new 
StandaloneCheckpointRecoveryFactory())
+                .setJobGraphStore(new StandaloneJobGraphStore())
+                .setDispatcherLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setResourceManagerLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setClusterRestEndpointLeaderRetriever(
+                        new StandaloneLeaderRetrievalService(
+                                "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setDispatcherLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setResourceManagerLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setClusterRestEndpointLeaderElectionService(new 
StandaloneLeaderElectionService())
+                .setJobMasterLeaderRetrieverFunction(
+                        jobId ->
+                                new StandaloneLeaderRetrievalService(
+                                        "localhost", 
HighAvailabilityServices.DEFAULT_LEADER_ID))
+                .setJobMasterLeaderElectionServiceFunction(
+                        jobId -> new StandaloneLeaderElectionService());
+    }
+
+    public static EmbeddedBuilder newEmbeddedBuilder(Executor executor) {
+        return new EmbeddedBuilder(executor);
+    }
+
+    public abstract static class Builder<T extends Builder<T>> {
+
+        protected @Nullable CheckpointRecoveryFactory 
checkpointRecoveryFactory;
+        protected @Nullable JobGraphStore jobGraphStore;
+        protected JobResultStore jobResultStore = new EmbeddedJobResultStore();
+        protected CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
+        protected CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        protected CompletableFuture<JobID> globalCleanupFuture = new 
CompletableFuture<>();
+
+        private Builder() {}
+
+        public T setCheckpointRecoveryFactory(CheckpointRecoveryFactory 
checkpointRecoveryFactory) {
+            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction =
-            ignored -> null;
+        public T setJobGraphStore(JobGraphStore jobGraphStore) {
+            this.jobGraphStore = jobGraphStore;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction =
-            ignored -> null;
+        public T setJobResultStore(JobResultStore jobResultStore) {
+            this.jobResultStore = jobResultStore;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private ConcurrentHashMap<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrievers =
-            new ConcurrentHashMap<>();
+        public T setCloseFuture(CompletableFuture<Void> closeFuture) {
+            this.closeFuture = closeFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private ConcurrentHashMap<JobID, LeaderElectionService> 
jobManagerLeaderElectionServices =
-            new ConcurrentHashMap<>();
+        public T setCloseAndCleanupAllDataFuture(
+                CompletableFuture<Void> closeAndCleanupAllDataFuture) {
+            this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile LeaderElectionService 
resourceManagerLeaderElectionService;
+        public T setGlobalCleanupFuture(CompletableFuture<JobID> 
globalCleanupFuture) {
+            this.globalCleanupFuture = globalCleanupFuture;
+            @SuppressWarnings("unchecked")
+            final T cast = (T) this;
+            return cast;
+        }
 
-    private volatile LeaderElectionService dispatcherLeaderElectionService;
+        public abstract TestingHighAvailabilityServices build();
+    }
 
-    private volatile LeaderElectionService 
clusterRestEndpointLeaderElectionService;
+    public static class EmbeddedBuilder extends Builder<EmbeddedBuilder> {
 
-    private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
+        private final Executor executor;
 
-    private volatile JobGraphStore jobGraphStore;
+        private EmbeddedBuilder(Executor executor) {
+            this.executor = executor;
+        }
 
-    private volatile JobResultStore jobResultStore = new 
EmbeddedJobResultStore();
+        @Override
+        public TestingHighAvailabilityServices build() {
+            try {
+                return new EmbeddedLeaderElection(
+                        checkpointRecoveryFactory,
+                        jobGraphStore,
+                        jobResultStore,
+                        closeFuture,
+                        closeAndCleanupAllDataFuture,
+                        globalCleanupFuture,
+                        new EmbeddedHaServicesWithLeadershipControl(executor));
+            } catch (Exception e) {
+                throw new IllegalStateException("Error building embedded 
services.", e);
+            }
+        }
+    }
 
-    private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+    public static class EmptyBuilder extends Builder<EmptyBuilder> {
+
+        private LeaderRetrievalService dispatcherLeaderRetriever;
+        private LeaderRetrievalService resourceManagerLeaderRetriever;
+        private LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+
+        private LeaderElectionService resourceManagerLeaderElectionService;
+        private LeaderElectionService dispatcherLeaderElectionService;
+        private LeaderElectionService clusterRestEndpointLeaderElectionService;
+
+        private Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction =
+                ignored -> null;
+        private Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction =
+                ignored -> null;
+
+        private EmptyBuilder() {}
+
+        public TestingHighAvailabilityServices build() {
+            return new ManualLeaderElection(
+                    checkpointRecoveryFactory,
+                    jobGraphStore,
+                    jobResultStore,
+                    closeFuture,
+                    closeAndCleanupAllDataFuture,
+                    globalCleanupFuture,
+                    dispatcherLeaderRetriever,
+                    resourceManagerLeaderRetriever,
+                    clusterRestEndpointLeaderRetriever,
+                    dispatcherLeaderElectionService,
+                    resourceManagerLeaderElectionService,
+                    clusterRestEndpointLeaderElectionService,
+                    jobMasterLeaderRetrieverFunction,
+                    jobMasterLeaderElectionServiceFunction);
+        }
 
-    private CompletableFuture<Void> closeAndCleanupAllDataFuture = new 
CompletableFuture<>();
+        public EmptyBuilder setResourceManagerLeaderRetriever(
+                LeaderRetrievalService resourceManagerLeaderRetriever) {
+            this.resourceManagerLeaderRetriever = 
resourceManagerLeaderRetriever;
+            return this;
+        }
 
-    private volatile CompletableFuture<JobID> globalCleanupFuture;
+        public EmptyBuilder setDispatcherLeaderRetriever(
+                LeaderRetrievalService dispatcherLeaderRetriever) {
+            this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+            return this;
+        }
 
-    // ------------------------------------------------------------------------
-    //  Setters for mock / testing implementations
-    // ------------------------------------------------------------------------
+        public EmptyBuilder setClusterRestEndpointLeaderRetriever(
+                final LeaderRetrievalService 
clusterRestEndpointLeaderRetriever) {
+            this.clusterRestEndpointLeaderRetriever = 
clusterRestEndpointLeaderRetriever;
+            return this;
+        }
 
-    public void setResourceManagerLeaderRetriever(
-            LeaderRetrievalService resourceManagerLeaderRetriever) {
-        this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
-    }
+        public EmptyBuilder setResourceManagerLeaderElectionService(
+                LeaderElectionService leaderElectionService) {
+            this.resourceManagerLeaderElectionService = leaderElectionService;
+            return this;
+        }
 
-    public void setDispatcherLeaderRetriever(LeaderRetrievalService 
dispatcherLeaderRetriever) {
-        this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
-    }
+        public EmptyBuilder setDispatcherLeaderElectionService(
+                LeaderElectionService leaderElectionService) {
+            this.dispatcherLeaderElectionService = leaderElectionService;
+            return this;
+        }
 
-    public void setClusterRestEndpointLeaderRetriever(
-            final LeaderRetrievalService clusterRestEndpointLeaderRetriever) {
-        this.clusterRestEndpointLeaderRetriever = 
clusterRestEndpointLeaderRetriever;
-    }
+        public EmptyBuilder setClusterRestEndpointLeaderElectionService(
+                final LeaderElectionService 
clusterRestEndpointLeaderElectionService) {
+            this.clusterRestEndpointLeaderElectionService =
+                    clusterRestEndpointLeaderElectionService;
+            return this;
+        }
 
-    public void setJobMasterLeaderRetriever(
-            JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever) {
-        this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
-    }
+        public EmptyBuilder setJobMasterLeaderElectionServiceFunction(
+                Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction) {
+            this.jobMasterLeaderElectionServiceFunction = 
jobMasterLeaderElectionServiceFunction;
+            return this;
+        }
 
-    public void setJobMasterLeaderElectionService(
-            JobID jobID, LeaderElectionService leaderElectionService) {
-        this.jobManagerLeaderElectionServices.put(jobID, 
leaderElectionService);
+        public EmptyBuilder setJobMasterLeaderRetrieverFunction(
+                Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction) {
+            this.jobMasterLeaderRetrieverFunction = 
jobMasterLeaderRetrieverFunction;
+            return this;
+        }
     }
 
-    public void setResourceManagerLeaderElectionService(
-            LeaderElectionService leaderElectionService) {
-        this.resourceManagerLeaderElectionService = leaderElectionService;
-    }
+    public static class EmbeddedLeaderElection extends 
TestingHighAvailabilityServices {
+
+        private final EmbeddedHaServicesWithLeadershipControl embeddedServices;
+
+        private EmbeddedLeaderElection(
+                @Nullable CheckpointRecoveryFactory checkpointRecoveryFactory,
+                @Nullable JobGraphStore jobGraphStore,
+                JobResultStore jobResultStore,
+                CompletableFuture<Void> closeFuture,
+                CompletableFuture<Void> closeAndCleanupAllDataFuture,
+                CompletableFuture<JobID> globalCleanupFuture,
+                EmbeddedHaServicesWithLeadershipControl embeddedServices)
+                throws Exception {
+            super(
+                    MoreObjects.firstNonNull(
+                            checkpointRecoveryFactory,
+                            embeddedServices.getCheckpointRecoveryFactory()),
+                    MoreObjects.firstNonNull(jobGraphStore, 
embeddedServices.getJobGraphStore()),
+                    jobResultStore,
+                    closeFuture,
+                    closeAndCleanupAllDataFuture,
+                    globalCleanupFuture);
+            this.embeddedServices = embeddedServices;
+        }
 
-    public void setDispatcherLeaderElectionService(LeaderElectionService 
leaderElectionService) {
-        this.dispatcherLeaderElectionService = leaderElectionService;
-    }
+        @Override
+        public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+            return embeddedServices.getResourceManagerLeaderRetriever();
+        }
 
-    public void setClusterRestEndpointLeaderElectionService(
-            final LeaderElectionService 
clusterRestEndpointLeaderElectionService) {
-        this.clusterRestEndpointLeaderElectionService = 
clusterRestEndpointLeaderElectionService;
-    }
+        @Override
+        public LeaderRetrievalService getDispatcherLeaderRetriever() {
+            return embeddedServices.getDispatcherLeaderRetriever();
+        }
 
-    public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory 
checkpointRecoveryFactory) {
-        this.checkpointRecoveryFactory = checkpointRecoveryFactory;
-    }
+        @Override
+        public LeaderRetrievalService getJobManagerLeaderRetriever(JobID 
jobId) {
+            return embeddedServices.getJobManagerLeaderRetriever(jobId);
+        }
 
-    public void setJobGraphStore(JobGraphStore jobGraphStore) {
-        this.jobGraphStore = jobGraphStore;
-    }
+        @Override
+        public LeaderRetrievalService getJobManagerLeaderRetriever(
+                JobID jobId, String defaultJobManagerAddress) {
+            return embeddedServices.getJobManagerLeaderRetriever(jobId, 
defaultJobManagerAddress);
+        }
 
-    public void setJobResultStore(JobResultStore jobResultStore) {
-        this.jobResultStore = jobResultStore;
-    }
+        @Override
+        public LeaderElectionService getResourceManagerLeaderElectionService() 
{
+            return embeddedServices.getResourceManagerLeaderElectionService();
+        }
 
-    public void setJobMasterLeaderElectionServiceFunction(
-            Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction) {
-        this.jobMasterLeaderElectionServiceFunction = 
jobMasterLeaderElectionServiceFunction;
-    }
+        @Override
+        public LeaderElectionService getDispatcherLeaderElectionService() {
+            return embeddedServices.getResourceManagerLeaderElectionService();
+        }
 
-    public void setJobMasterLeaderRetrieverFunction(
-            Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction) {
-        this.jobMasterLeaderRetrieverFunction = 
jobMasterLeaderRetrieverFunction;
-    }
+        @Override
+        public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
+            return embeddedServices.getJobManagerLeaderElectionService(jobID);
+        }
 
-    public void setCloseFuture(CompletableFuture<Void> closeFuture) {
-        this.closeFuture = closeFuture;
-    }
+        @Override
+        public LeaderElectionService 
getClusterRestEndpointLeaderElectionService() {
+            return 
embeddedServices.getClusterRestEndpointLeaderElectionService();
+        }
 
-    public void setCloseAndCleanupAllDataFuture(
-            CompletableFuture<Void> closeAndCleanupAllDataFuture) {
-        this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
-    }
+        @Override
+        public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
+            return embeddedServices.getClusterRestEndpointLeaderRetriever();
+        }
 
-    public void setGlobalCleanupFuture(CompletableFuture<JobID> 
globalCleanupFuture) {
-        this.globalCleanupFuture = globalCleanupFuture;
+        @Override
+        public HaLeadershipControl getLeadershipControl() {
+            return embeddedServices;
+        }
     }
 
-    // ------------------------------------------------------------------------
-    //  HA Services Methods
-    // ------------------------------------------------------------------------
+    public static class ManualLeaderElection extends 
TestingHighAvailabilityServices {
+
+        @Nullable private final LeaderRetrievalService 
dispatcherLeaderRetriever;
+        @Nullable private final LeaderRetrievalService 
resourceManagerLeaderRetriever;
+        @Nullable private final LeaderRetrievalService 
clusterRestEndpointLeaderRetriever;
+        @Nullable private final LeaderElectionService 
dispatcherLeaderElectionService;
+        @Nullable private final LeaderElectionService 
resourceManagerLeaderElectionService;
+        @Nullable private final LeaderElectionService 
clusterRestEndpointLeaderElectionService;
+
+        private final ConcurrentHashMap<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrievers =
+                new ConcurrentHashMap<>();
+        private final ConcurrentHashMap<JobID, LeaderElectionService>
+                jobManagerLeaderElectionServices = new ConcurrentHashMap<>();
+
+        private final Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction;
+        private final Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction;
+
+        private ManualLeaderElection(
+                @Nullable CheckpointRecoveryFactory checkpointRecoveryFactory,
+                @Nullable JobGraphStore jobGraphStore,
+                JobResultStore jobResultStore,
+                CompletableFuture<Void> closeFuture,
+                CompletableFuture<Void> closeAndCleanupAllDataFuture,
+                CompletableFuture<JobID> globalCleanupFuture,
+                @Nullable LeaderRetrievalService dispatcherLeaderRetriever,
+                @Nullable LeaderRetrievalService 
resourceManagerLeaderRetriever,
+                @Nullable LeaderRetrievalService 
clusterRestEndpointLeaderRetriever,
+                @Nullable LeaderElectionService 
dispatcherLeaderElectionService,
+                @Nullable LeaderElectionService 
resourceManagerLeaderElectionService,
+                @Nullable LeaderElectionService 
clusterRestEndpointLeaderElectionService,
+                Function<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrieverFunction,
+                Function<JobID, LeaderElectionService> 
jobMasterLeaderElectionServiceFunction) {
+            super(
+                    checkpointRecoveryFactory,
+                    jobGraphStore,
+                    jobResultStore,
+                    closeFuture,
+                    closeAndCleanupAllDataFuture,
+                    globalCleanupFuture);
+            this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+            this.resourceManagerLeaderRetriever = 
resourceManagerLeaderRetriever;
+            this.clusterRestEndpointLeaderRetriever = 
clusterRestEndpointLeaderRetriever;
+            this.dispatcherLeaderElectionService = 
dispatcherLeaderElectionService;
+            this.resourceManagerLeaderElectionService = 
resourceManagerLeaderElectionService;
+            this.clusterRestEndpointLeaderElectionService =
+                    clusterRestEndpointLeaderElectionService;
+            this.jobMasterLeaderRetrieverFunction = 
jobMasterLeaderRetrieverFunction;
+            this.jobMasterLeaderElectionServiceFunction = 
jobMasterLeaderElectionServiceFunction;
+        }
 
-    @Override
-    public LeaderRetrievalService getResourceManagerLeaderRetriever() {
-        LeaderRetrievalService service = this.resourceManagerLeaderRetriever;
-        if (service != null) {
-            return service;
-        } else {
-            throw new IllegalStateException("ResourceManagerLeaderRetriever 
has not been set");
+        @Override
+        public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+            if (resourceManagerLeaderRetriever == null) {
+                throw new 
IllegalStateException("ResourceManagerLeaderRetriever has not been set");
+            }
+            return resourceManagerLeaderRetriever;
         }
-    }
 
-    @Override
-    public LeaderRetrievalService getDispatcherLeaderRetriever() {
-        LeaderRetrievalService service = this.dispatcherLeaderRetriever;
-        if (service != null) {
-            return service;
-        } else {
-            throw new IllegalStateException("ResourceManagerLeaderRetriever 
has not been set");
+        @Override
+        public LeaderRetrievalService getDispatcherLeaderRetriever() {
+            if (dispatcherLeaderRetriever == null) {
+                throw new 
IllegalStateException("ResourceManagerLeaderRetriever has not been set");
+            }
+            return dispatcherLeaderRetriever;
         }
-    }
 
-    @Override
-    public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
-        LeaderRetrievalService service =
-                jobMasterLeaderRetrievers.computeIfAbsent(jobID, 
jobMasterLeaderRetrieverFunction);
-        if (service != null) {
+        @Override
+        public LeaderRetrievalService getJobManagerLeaderRetriever(JobID 
jobID) {
+            @Nullable
+            final LeaderRetrievalService service =
+                    jobMasterLeaderRetrievers.computeIfAbsent(
+                            jobID, jobMasterLeaderRetrieverFunction);
+            if (service == null) {
+                throw new IllegalStateException("JobMasterLeaderRetriever has 
not been set");
+            }
             return service;
-        } else {
-            throw new IllegalStateException("JobMasterLeaderRetriever has not 
been set");
         }
-    }
 
-    @Override
-    public LeaderRetrievalService getJobManagerLeaderRetriever(
-            JobID jobID, String defaultJobManagerAddress) {
-        return getJobManagerLeaderRetriever(jobID);
-    }
+        @Override
+        public LeaderRetrievalService getJobManagerLeaderRetriever(
+                JobID jobID, String defaultJobManagerAddress) {
+            return getJobManagerLeaderRetriever(jobID);
+        }
 
-    @Override
-    public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
-        return clusterRestEndpointLeaderRetriever;
-    }
+        @Override
+        public LeaderElectionService getResourceManagerLeaderElectionService() 
{
+            if (resourceManagerLeaderElectionService == null) {
+                throw new IllegalStateException(
+                        "ResourceManagerLeaderElectionService has not been 
set");
+            }
+            return resourceManagerLeaderElectionService;
+        }
 
-    @Override
-    public LeaderElectionService getResourceManagerLeaderElectionService() {
-        LeaderElectionService service = resourceManagerLeaderElectionService;
+        @Override
+        public LeaderElectionService getDispatcherLeaderElectionService() {
+            if (dispatcherLeaderElectionService == null) {
+                throw new 
IllegalStateException("DispatcherLeaderElectionService has not been set");
+            }
+            return dispatcherLeaderElectionService;
+        }
 
-        if (service != null) {
+        @Override
+        public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
+            @Nullable
+            final LeaderElectionService service =
+                    jobManagerLeaderElectionServices.computeIfAbsent(
+                            jobID, jobMasterLeaderElectionServiceFunction);
+            if (service == null) {
+                throw new 
IllegalStateException("JobMasterLeaderElectionService has not been set");
+            }
             return service;
-        } else {
-            throw new IllegalStateException(
-                    "ResourceManagerLeaderElectionService has not been set");
         }
-    }
-
-    @Override
-    public LeaderElectionService getDispatcherLeaderElectionService() {
-        LeaderElectionService service = dispatcherLeaderElectionService;
 
-        if (service != null) {
-            return service;
-        } else {
-            throw new IllegalStateException("DispatcherLeaderElectionService 
has not been set");
+        @Override
+        public LeaderElectionService 
getClusterRestEndpointLeaderElectionService() {
+            if (clusterRestEndpointLeaderElectionService == null) {
+                throw new IllegalStateException(
+                        "ClusterRestEndpointLeaderElectionService has not been 
set");
+            }
+            return dispatcherLeaderElectionService;
         }
-    }
 
-    @Override
-    public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
-        LeaderElectionService service =
-                jobManagerLeaderElectionServices.computeIfAbsent(
-                        jobID, jobMasterLeaderElectionServiceFunction);
+        @Override
+        public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
+            if (clusterRestEndpointLeaderRetriever == null) {
+                throw new IllegalStateException(
+                        "ClusterRestEndpointLeaderRetriever has not been set");
+            }
+            return dispatcherLeaderRetriever;
+        }
 
-        if (service != null) {
-            return service;
-        } else {
-            throw new IllegalStateException("JobMasterLeaderElectionService 
has not been set");
+        @Override
+        public HaLeadershipControl getLeadershipControl() {
+            throw new UnsupportedOperationException();
         }
     }
 
-    @Override
-    public LeaderElectionService getClusterRestEndpointLeaderElectionService() 
{
-        return clusterRestEndpointLeaderElectionService;
+    @Nullable protected final CheckpointRecoveryFactory 
checkpointRecoveryFactory;
+    @Nullable private final JobGraphStore jobGraphStore;
+    private final JobResultStore jobResultStore;
+    private final CompletableFuture<Void> closeFuture;
+    private final CompletableFuture<Void> closeAndCleanupAllDataFuture;
+    private final CompletableFuture<JobID> globalCleanupFuture;

Review comment:
       I guess you didn't add the `blobStore` here as a member because this PR 
is more about uniting functionality and not adding new things?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
##########
@@ -114,9 +114,19 @@ private TaskSubmissionTestEnvironment(
             ShuffleEnvironment<?, ?> shuffleEnvironment)
             throws Exception {
 
-        this.haServices = new TestingHighAvailabilityServices();
-        this.haServices.setResourceManagerLeaderRetriever(new 
SettableLeaderRetrievalService());
-        this.haServices.setJobMasterLeaderRetriever(jobId, new 
SettableLeaderRetrievalService());
+        this.haServices =
+                TestingHighAvailabilityServices.newBuilder()
+                        .setResourceManagerLeaderRetriever(new 
SettableLeaderRetrievalService())
+                        .setJobMasterLeaderRetrieverFunction(

Review comment:
       What about providing a utility method:
   ```
   withJobMasterLeaderRetrievalService(JobID expectedJobId, 
Supplier<LeaderRetrievalService> serviceSupplier) {
     if (!jobId.equals(innerJobId)) {
       throw new IllegalArgumentException(
         String.format("Expected %s, got %s.", jobId, innerJobId));
     }
     return serviceSupplier.get();
   }
   ```
   
   ...since it's used in quite a few places (with `assertEquals`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to