XComp commented on a change in pull request #18650:
URL: https://github.com/apache/flink/pull/18650#discussion_r801459535
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
* A variant of the HighAvailabilityServices for testing. Each individual
service can be set to an
* arbitrary implementation, such as a mock or default service.
*/
-public class TestingHighAvailabilityServices implements
HighAvailabilityServices {
-
- private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
- private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
- private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements
HighAvailabilityServices {
+
+ public static EmptyBuilder newBuilder() {
+ return new EmptyBuilder();
+ }
+
+ public static EmptyBuilder newStandaloneBuilder() {
+ return new EmptyBuilder()
+ .setCheckpointRecoveryFactory(new
StandaloneCheckpointRecoveryFactory())
+ .setJobGraphStore(new StandaloneJobGraphStore())
+ .setDispatcherLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setResourceManagerLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setClusterRestEndpointLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setDispatcherLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setResourceManagerLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setClusterRestEndpointLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setJobMasterLeaderRetrieverFunction(
+ jobId ->
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setJobMasterLeaderElectionServiceFunction(
+ jobId -> new StandaloneLeaderElectionService());
+ }
+
+ public static EmbeddedBuilder newEmbeddedBuilder(Executor executor) {
+ return new EmbeddedBuilder(executor);
+ }
+
+ public abstract static class Builder<T extends Builder<T>> {
+
+ protected @Nullable CheckpointRecoveryFactory
checkpointRecoveryFactory;
+ protected @Nullable JobGraphStore jobGraphStore;
+ protected JobResultStore jobResultStore = new EmbeddedJobResultStore();
+ protected CompletableFuture<Void> closeFuture = new
CompletableFuture<>();
+ protected CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ protected CompletableFuture<JobID> globalCleanupFuture = new
CompletableFuture<>();
+
+ private Builder() {}
+
+ public T setCheckpointRecoveryFactory(CheckpointRecoveryFactory
checkpointRecoveryFactory) {
+ this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction =
- ignored -> null;
+ public T setJobGraphStore(JobGraphStore jobGraphStore) {
+ this.jobGraphStore = jobGraphStore;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction =
- ignored -> null;
+ public T setJobResultStore(JobResultStore jobResultStore) {
+ this.jobResultStore = jobResultStore;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private ConcurrentHashMap<JobID, LeaderRetrievalService>
jobMasterLeaderRetrievers =
- new ConcurrentHashMap<>();
+ public T setCloseFuture(CompletableFuture<Void> closeFuture) {
+ this.closeFuture = closeFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private ConcurrentHashMap<JobID, LeaderElectionService>
jobManagerLeaderElectionServices =
- new ConcurrentHashMap<>();
+ public T setCloseAndCleanupAllDataFuture(
+ CompletableFuture<Void> closeAndCleanupAllDataFuture) {
+ this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile LeaderElectionService
resourceManagerLeaderElectionService;
+ public T setGlobalCleanupFuture(CompletableFuture<JobID>
globalCleanupFuture) {
+ this.globalCleanupFuture = globalCleanupFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile LeaderElectionService dispatcherLeaderElectionService;
+ public abstract TestingHighAvailabilityServices build();
+ }
- private volatile LeaderElectionService
clusterRestEndpointLeaderElectionService;
+ public static class EmbeddedBuilder extends Builder<EmbeddedBuilder> {
- private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
+ private final Executor executor;
- private volatile JobGraphStore jobGraphStore;
+ private EmbeddedBuilder(Executor executor) {
+ this.executor = executor;
+ }
- private volatile JobResultStore jobResultStore = new
EmbeddedJobResultStore();
+ @Override
+ public TestingHighAvailabilityServices build() {
+ try {
+ return new EmbeddedLeaderElection(
+ checkpointRecoveryFactory,
+ jobGraphStore,
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture,
+ new EmbeddedHaServicesWithLeadershipControl(executor));
+ } catch (Exception e) {
+ throw new IllegalStateException("Error building embedded
services.", e);
+ }
+ }
+ }
- private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ public static class EmptyBuilder extends Builder<EmptyBuilder> {
+
+ private LeaderRetrievalService dispatcherLeaderRetriever;
+ private LeaderRetrievalService resourceManagerLeaderRetriever;
+ private LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+
+ private LeaderElectionService resourceManagerLeaderElectionService;
+ private LeaderElectionService dispatcherLeaderElectionService;
+ private LeaderElectionService clusterRestEndpointLeaderElectionService;
+
+ private Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction =
+ ignored -> null;
+ private Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction =
+ ignored -> null;
+
+ private EmptyBuilder() {}
+
+ public TestingHighAvailabilityServices build() {
+ return new ManualLeaderElection(
+ checkpointRecoveryFactory,
+ jobGraphStore,
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture,
+ dispatcherLeaderRetriever,
+ resourceManagerLeaderRetriever,
+ clusterRestEndpointLeaderRetriever,
+ dispatcherLeaderElectionService,
+ resourceManagerLeaderElectionService,
+ clusterRestEndpointLeaderElectionService,
+ jobMasterLeaderRetrieverFunction,
+ jobMasterLeaderElectionServiceFunction);
+ }
- private CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ public EmptyBuilder setResourceManagerLeaderRetriever(
+ LeaderRetrievalService resourceManagerLeaderRetriever) {
+ this.resourceManagerLeaderRetriever =
resourceManagerLeaderRetriever;
+ return this;
+ }
- private volatile CompletableFuture<JobID> globalCleanupFuture;
+ public EmptyBuilder setDispatcherLeaderRetriever(
+ LeaderRetrievalService dispatcherLeaderRetriever) {
+ this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+ return this;
+ }
- // ------------------------------------------------------------------------
- // Setters for mock / testing implementations
- // ------------------------------------------------------------------------
+ public EmptyBuilder setClusterRestEndpointLeaderRetriever(
+ final LeaderRetrievalService
clusterRestEndpointLeaderRetriever) {
+ this.clusterRestEndpointLeaderRetriever =
clusterRestEndpointLeaderRetriever;
+ return this;
+ }
- public void setResourceManagerLeaderRetriever(
- LeaderRetrievalService resourceManagerLeaderRetriever) {
- this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
- }
+ public EmptyBuilder setResourceManagerLeaderElectionService(
+ LeaderElectionService leaderElectionService) {
+ this.resourceManagerLeaderElectionService = leaderElectionService;
+ return this;
+ }
- public void setDispatcherLeaderRetriever(LeaderRetrievalService
dispatcherLeaderRetriever) {
- this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
- }
+ public EmptyBuilder setDispatcherLeaderElectionService(
+ LeaderElectionService leaderElectionService) {
+ this.dispatcherLeaderElectionService = leaderElectionService;
+ return this;
+ }
- public void setClusterRestEndpointLeaderRetriever(
- final LeaderRetrievalService clusterRestEndpointLeaderRetriever) {
- this.clusterRestEndpointLeaderRetriever =
clusterRestEndpointLeaderRetriever;
- }
+ public EmptyBuilder setClusterRestEndpointLeaderElectionService(
+ final LeaderElectionService
clusterRestEndpointLeaderElectionService) {
+ this.clusterRestEndpointLeaderElectionService =
+ clusterRestEndpointLeaderElectionService;
+ return this;
+ }
- public void setJobMasterLeaderRetriever(
- JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever) {
- this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
- }
+ public EmptyBuilder setJobMasterLeaderElectionServiceFunction(
+ Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction) {
+ this.jobMasterLeaderElectionServiceFunction =
jobMasterLeaderElectionServiceFunction;
+ return this;
+ }
- public void setJobMasterLeaderElectionService(
- JobID jobID, LeaderElectionService leaderElectionService) {
- this.jobManagerLeaderElectionServices.put(jobID,
leaderElectionService);
+ public EmptyBuilder setJobMasterLeaderRetrieverFunction(
+ Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction) {
+ this.jobMasterLeaderRetrieverFunction =
jobMasterLeaderRetrieverFunction;
+ return this;
+ }
}
- public void setResourceManagerLeaderElectionService(
- LeaderElectionService leaderElectionService) {
- this.resourceManagerLeaderElectionService = leaderElectionService;
- }
+ public static class EmbeddedLeaderElection extends
TestingHighAvailabilityServices {
Review comment:
The naming is confusing. Can we rename this into something like
`TestingHighAvailabilityServicesWithManualLeaderElection` or
`TestingHAServicesWithManualLeaderElection` or
`ManualLeaderElectionHAServices`? I initially got confused when reading the
code and seeing that the `EmbeddedBuilder` builds a "EmbeddedLeaderElecton"
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
* A variant of the HighAvailabilityServices for testing. Each individual
service can be set to an
* arbitrary implementation, such as a mock or default service.
*/
-public class TestingHighAvailabilityServices implements
HighAvailabilityServices {
-
- private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
- private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
- private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements
HighAvailabilityServices {
Review comment:
~~Don't you think that it's worth it splitting this class up? There are
so many implementations with various members that it's easy to lose track of
which subclass implements what functionality.~~
It looks like we could get rid of the `Embedded*` implementation considering
that it's only used in the `ApplicationDispatcherBootstrapITCase` where it's
not utilized as far as I can see... That would help making this code less
complex.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
* A variant of the HighAvailabilityServices for testing. Each individual
service can be set to an
* arbitrary implementation, such as a mock or default service.
*/
-public class TestingHighAvailabilityServices implements
HighAvailabilityServices {
-
- private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
- private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
- private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements
HighAvailabilityServices {
+
+ public static EmptyBuilder newBuilder() {
+ return new EmptyBuilder();
+ }
+
+ public static EmptyBuilder newStandaloneBuilder() {
+ return new EmptyBuilder()
+ .setCheckpointRecoveryFactory(new
StandaloneCheckpointRecoveryFactory())
+ .setJobGraphStore(new StandaloneJobGraphStore())
+ .setDispatcherLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setResourceManagerLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setClusterRestEndpointLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setDispatcherLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setResourceManagerLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setClusterRestEndpointLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setJobMasterLeaderRetrieverFunction(
+ jobId ->
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setJobMasterLeaderElectionServiceFunction(
+ jobId -> new StandaloneLeaderElectionService());
+ }
+
+ public static EmbeddedBuilder newEmbeddedBuilder(Executor executor) {
+ return new EmbeddedBuilder(executor);
+ }
+
+ public abstract static class Builder<T extends Builder<T>> {
+
+ protected @Nullable CheckpointRecoveryFactory
checkpointRecoveryFactory;
+ protected @Nullable JobGraphStore jobGraphStore;
+ protected JobResultStore jobResultStore = new EmbeddedJobResultStore();
+ protected CompletableFuture<Void> closeFuture = new
CompletableFuture<>();
+ protected CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ protected CompletableFuture<JobID> globalCleanupFuture = new
CompletableFuture<>();
+
+ private Builder() {}
+
+ public T setCheckpointRecoveryFactory(CheckpointRecoveryFactory
checkpointRecoveryFactory) {
+ this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction =
- ignored -> null;
+ public T setJobGraphStore(JobGraphStore jobGraphStore) {
+ this.jobGraphStore = jobGraphStore;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction =
- ignored -> null;
+ public T setJobResultStore(JobResultStore jobResultStore) {
+ this.jobResultStore = jobResultStore;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private ConcurrentHashMap<JobID, LeaderRetrievalService>
jobMasterLeaderRetrievers =
- new ConcurrentHashMap<>();
+ public T setCloseFuture(CompletableFuture<Void> closeFuture) {
+ this.closeFuture = closeFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private ConcurrentHashMap<JobID, LeaderElectionService>
jobManagerLeaderElectionServices =
- new ConcurrentHashMap<>();
+ public T setCloseAndCleanupAllDataFuture(
+ CompletableFuture<Void> closeAndCleanupAllDataFuture) {
+ this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile LeaderElectionService
resourceManagerLeaderElectionService;
+ public T setGlobalCleanupFuture(CompletableFuture<JobID>
globalCleanupFuture) {
+ this.globalCleanupFuture = globalCleanupFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile LeaderElectionService dispatcherLeaderElectionService;
+ public abstract TestingHighAvailabilityServices build();
+ }
- private volatile LeaderElectionService
clusterRestEndpointLeaderElectionService;
+ public static class EmbeddedBuilder extends Builder<EmbeddedBuilder> {
- private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
+ private final Executor executor;
- private volatile JobGraphStore jobGraphStore;
+ private EmbeddedBuilder(Executor executor) {
+ this.executor = executor;
+ }
- private volatile JobResultStore jobResultStore = new
EmbeddedJobResultStore();
+ @Override
+ public TestingHighAvailabilityServices build() {
+ try {
+ return new EmbeddedLeaderElection(
+ checkpointRecoveryFactory,
+ jobGraphStore,
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture,
+ new EmbeddedHaServicesWithLeadershipControl(executor));
+ } catch (Exception e) {
+ throw new IllegalStateException("Error building embedded
services.", e);
+ }
+ }
+ }
- private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ public static class EmptyBuilder extends Builder<EmptyBuilder> {
+
+ private LeaderRetrievalService dispatcherLeaderRetriever;
+ private LeaderRetrievalService resourceManagerLeaderRetriever;
+ private LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+
+ private LeaderElectionService resourceManagerLeaderElectionService;
+ private LeaderElectionService dispatcherLeaderElectionService;
+ private LeaderElectionService clusterRestEndpointLeaderElectionService;
+
+ private Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction =
+ ignored -> null;
+ private Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction =
+ ignored -> null;
+
+ private EmptyBuilder() {}
+
+ public TestingHighAvailabilityServices build() {
+ return new ManualLeaderElection(
+ checkpointRecoveryFactory,
+ jobGraphStore,
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture,
+ dispatcherLeaderRetriever,
+ resourceManagerLeaderRetriever,
+ clusterRestEndpointLeaderRetriever,
+ dispatcherLeaderElectionService,
+ resourceManagerLeaderElectionService,
+ clusterRestEndpointLeaderElectionService,
+ jobMasterLeaderRetrieverFunction,
+ jobMasterLeaderElectionServiceFunction);
+ }
- private CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ public EmptyBuilder setResourceManagerLeaderRetriever(
+ LeaderRetrievalService resourceManagerLeaderRetriever) {
+ this.resourceManagerLeaderRetriever =
resourceManagerLeaderRetriever;
+ return this;
+ }
- private volatile CompletableFuture<JobID> globalCleanupFuture;
+ public EmptyBuilder setDispatcherLeaderRetriever(
+ LeaderRetrievalService dispatcherLeaderRetriever) {
+ this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+ return this;
+ }
- // ------------------------------------------------------------------------
- // Setters for mock / testing implementations
- // ------------------------------------------------------------------------
+ public EmptyBuilder setClusterRestEndpointLeaderRetriever(
+ final LeaderRetrievalService
clusterRestEndpointLeaderRetriever) {
+ this.clusterRestEndpointLeaderRetriever =
clusterRestEndpointLeaderRetriever;
+ return this;
+ }
- public void setResourceManagerLeaderRetriever(
- LeaderRetrievalService resourceManagerLeaderRetriever) {
- this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
- }
+ public EmptyBuilder setResourceManagerLeaderElectionService(
+ LeaderElectionService leaderElectionService) {
+ this.resourceManagerLeaderElectionService = leaderElectionService;
+ return this;
+ }
- public void setDispatcherLeaderRetriever(LeaderRetrievalService
dispatcherLeaderRetriever) {
- this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
- }
+ public EmptyBuilder setDispatcherLeaderElectionService(
+ LeaderElectionService leaderElectionService) {
+ this.dispatcherLeaderElectionService = leaderElectionService;
+ return this;
+ }
- public void setClusterRestEndpointLeaderRetriever(
- final LeaderRetrievalService clusterRestEndpointLeaderRetriever) {
- this.clusterRestEndpointLeaderRetriever =
clusterRestEndpointLeaderRetriever;
- }
+ public EmptyBuilder setClusterRestEndpointLeaderElectionService(
+ final LeaderElectionService
clusterRestEndpointLeaderElectionService) {
+ this.clusterRestEndpointLeaderElectionService =
+ clusterRestEndpointLeaderElectionService;
+ return this;
+ }
- public void setJobMasterLeaderRetriever(
- JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever) {
- this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
- }
+ public EmptyBuilder setJobMasterLeaderElectionServiceFunction(
+ Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction) {
+ this.jobMasterLeaderElectionServiceFunction =
jobMasterLeaderElectionServiceFunction;
+ return this;
+ }
- public void setJobMasterLeaderElectionService(
- JobID jobID, LeaderElectionService leaderElectionService) {
- this.jobManagerLeaderElectionServices.put(jobID,
leaderElectionService);
+ public EmptyBuilder setJobMasterLeaderRetrieverFunction(
+ Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction) {
+ this.jobMasterLeaderRetrieverFunction =
jobMasterLeaderRetrieverFunction;
+ return this;
+ }
}
- public void setResourceManagerLeaderElectionService(
- LeaderElectionService leaderElectionService) {
- this.resourceManagerLeaderElectionService = leaderElectionService;
- }
+ public static class EmbeddedLeaderElection extends
TestingHighAvailabilityServices {
+
+ private final EmbeddedHaServicesWithLeadershipControl embeddedServices;
+
+ private EmbeddedLeaderElection(
+ @Nullable CheckpointRecoveryFactory checkpointRecoveryFactory,
+ @Nullable JobGraphStore jobGraphStore,
+ JobResultStore jobResultStore,
+ CompletableFuture<Void> closeFuture,
+ CompletableFuture<Void> closeAndCleanupAllDataFuture,
+ CompletableFuture<JobID> globalCleanupFuture,
+ EmbeddedHaServicesWithLeadershipControl embeddedServices)
+ throws Exception {
+ super(
+ MoreObjects.firstNonNull(
+ checkpointRecoveryFactory,
+ embeddedServices.getCheckpointRecoveryFactory()),
+ MoreObjects.firstNonNull(jobGraphStore,
embeddedServices.getJobGraphStore()),
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture);
+ this.embeddedServices = embeddedServices;
+ }
- public void setDispatcherLeaderElectionService(LeaderElectionService
leaderElectionService) {
- this.dispatcherLeaderElectionService = leaderElectionService;
- }
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ return embeddedServices.getResourceManagerLeaderRetriever();
+ }
- public void setClusterRestEndpointLeaderElectionService(
- final LeaderElectionService
clusterRestEndpointLeaderElectionService) {
- this.clusterRestEndpointLeaderElectionService =
clusterRestEndpointLeaderElectionService;
- }
+ @Override
+ public LeaderRetrievalService getDispatcherLeaderRetriever() {
+ return embeddedServices.getDispatcherLeaderRetriever();
+ }
- public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory
checkpointRecoveryFactory) {
- this.checkpointRecoveryFactory = checkpointRecoveryFactory;
- }
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID
jobId) {
+ return embeddedServices.getJobManagerLeaderRetriever(jobId);
+ }
- public void setJobGraphStore(JobGraphStore jobGraphStore) {
- this.jobGraphStore = jobGraphStore;
- }
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(
+ JobID jobId, String defaultJobManagerAddress) {
+ return embeddedServices.getJobManagerLeaderRetriever(jobId,
defaultJobManagerAddress);
+ }
- public void setJobResultStore(JobResultStore jobResultStore) {
- this.jobResultStore = jobResultStore;
- }
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService()
{
+ return embeddedServices.getResourceManagerLeaderElectionService();
+ }
- public void setJobMasterLeaderElectionServiceFunction(
- Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction) {
- this.jobMasterLeaderElectionServiceFunction =
jobMasterLeaderElectionServiceFunction;
- }
+ @Override
+ public LeaderElectionService getDispatcherLeaderElectionService() {
+ return embeddedServices.getResourceManagerLeaderElectionService();
+ }
- public void setJobMasterLeaderRetrieverFunction(
- Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction) {
- this.jobMasterLeaderRetrieverFunction =
jobMasterLeaderRetrieverFunction;
- }
+ @Override
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID
jobID) {
+ return embeddedServices.getJobManagerLeaderElectionService(jobID);
+ }
- public void setCloseFuture(CompletableFuture<Void> closeFuture) {
- this.closeFuture = closeFuture;
- }
+ @Override
+ public LeaderElectionService
getClusterRestEndpointLeaderElectionService() {
+ return
embeddedServices.getClusterRestEndpointLeaderElectionService();
+ }
- public void setCloseAndCleanupAllDataFuture(
- CompletableFuture<Void> closeAndCleanupAllDataFuture) {
- this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
- }
+ @Override
+ public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
+ return embeddedServices.getClusterRestEndpointLeaderRetriever();
+ }
- public void setGlobalCleanupFuture(CompletableFuture<JobID>
globalCleanupFuture) {
- this.globalCleanupFuture = globalCleanupFuture;
+ @Override
+ public HaLeadershipControl getLeadershipControl() {
+ return embeddedServices;
+ }
}
- // ------------------------------------------------------------------------
- // HA Services Methods
- // ------------------------------------------------------------------------
+ public static class ManualLeaderElection extends
TestingHighAvailabilityServices {
Review comment:
Same as what I already suggested for the `EmbeddedLeaderElection`: Can
we rename it to something indicating that it's actually a HAServices
implementation? It looks like that's because the old implementation implemented
Builder functionality with setters as well. That means that we could move this
check into the constructors
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
* A variant of the HighAvailabilityServices for testing. Each individual
service can be set to an
* arbitrary implementation, such as a mock or default service.
*/
-public class TestingHighAvailabilityServices implements
HighAvailabilityServices {
-
- private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
- private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
- private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements
HighAvailabilityServices {
+
+ public static EmptyBuilder newBuilder() {
Review comment:
Could we add some JavaDoc to the `new*Builder` methods to describe the
use-cases in which each of these methods should be used?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
* A variant of the HighAvailabilityServices for testing. Each individual
service can be set to an
* arbitrary implementation, such as a mock or default service.
*/
-public class TestingHighAvailabilityServices implements
HighAvailabilityServices {
-
- private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
- private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
- private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements
HighAvailabilityServices {
+
+ public static EmptyBuilder newBuilder() {
+ return new EmptyBuilder();
+ }
+
+ public static EmptyBuilder newStandaloneBuilder() {
+ return new EmptyBuilder()
+ .setCheckpointRecoveryFactory(new
StandaloneCheckpointRecoveryFactory())
+ .setJobGraphStore(new StandaloneJobGraphStore())
+ .setDispatcherLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setResourceManagerLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setClusterRestEndpointLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setDispatcherLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setResourceManagerLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setClusterRestEndpointLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setJobMasterLeaderRetrieverFunction(
+ jobId ->
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setJobMasterLeaderElectionServiceFunction(
+ jobId -> new StandaloneLeaderElectionService());
+ }
+
+ public static EmbeddedBuilder newEmbeddedBuilder(Executor executor) {
+ return new EmbeddedBuilder(executor);
+ }
+
+ public abstract static class Builder<T extends Builder<T>> {
+
+ protected @Nullable CheckpointRecoveryFactory
checkpointRecoveryFactory;
+ protected @Nullable JobGraphStore jobGraphStore;
+ protected JobResultStore jobResultStore = new EmbeddedJobResultStore();
+ protected CompletableFuture<Void> closeFuture = new
CompletableFuture<>();
+ protected CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ protected CompletableFuture<JobID> globalCleanupFuture = new
CompletableFuture<>();
+
+ private Builder() {}
+
+ public T setCheckpointRecoveryFactory(CheckpointRecoveryFactory
checkpointRecoveryFactory) {
+ this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction =
- ignored -> null;
+ public T setJobGraphStore(JobGraphStore jobGraphStore) {
+ this.jobGraphStore = jobGraphStore;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction =
- ignored -> null;
+ public T setJobResultStore(JobResultStore jobResultStore) {
+ this.jobResultStore = jobResultStore;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private ConcurrentHashMap<JobID, LeaderRetrievalService>
jobMasterLeaderRetrievers =
- new ConcurrentHashMap<>();
+ public T setCloseFuture(CompletableFuture<Void> closeFuture) {
+ this.closeFuture = closeFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private ConcurrentHashMap<JobID, LeaderElectionService>
jobManagerLeaderElectionServices =
- new ConcurrentHashMap<>();
+ public T setCloseAndCleanupAllDataFuture(
+ CompletableFuture<Void> closeAndCleanupAllDataFuture) {
+ this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile LeaderElectionService
resourceManagerLeaderElectionService;
+ public T setGlobalCleanupFuture(CompletableFuture<JobID>
globalCleanupFuture) {
+ this.globalCleanupFuture = globalCleanupFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile LeaderElectionService dispatcherLeaderElectionService;
+ public abstract TestingHighAvailabilityServices build();
+ }
- private volatile LeaderElectionService
clusterRestEndpointLeaderElectionService;
+ public static class EmbeddedBuilder extends Builder<EmbeddedBuilder> {
- private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
+ private final Executor executor;
- private volatile JobGraphStore jobGraphStore;
+ private EmbeddedBuilder(Executor executor) {
+ this.executor = executor;
+ }
- private volatile JobResultStore jobResultStore = new
EmbeddedJobResultStore();
+ @Override
+ public TestingHighAvailabilityServices build() {
+ try {
+ return new EmbeddedLeaderElection(
+ checkpointRecoveryFactory,
+ jobGraphStore,
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture,
+ new EmbeddedHaServicesWithLeadershipControl(executor));
+ } catch (Exception e) {
+ throw new IllegalStateException("Error building embedded
services.", e);
+ }
+ }
+ }
- private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ public static class EmptyBuilder extends Builder<EmptyBuilder> {
Review comment:
`BuilderWithLeadershipServices` is a more descriptive name, here. WDYT?
🤔
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -162,7 +165,11 @@ public void setUp() throws Exception {
jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
jobId = jobGraph.getJobID();
jobMasterLeaderElectionService = new TestingLeaderElectionService();
- haServices.setJobMasterLeaderElectionService(jobId,
jobMasterLeaderElectionService);
+ haServicesBuilder.setJobMasterLeaderElectionServiceFunction(
+ jobId -> {
+ assertEquals(jobGraph.getJobID(), jobId);
Review comment:
nit: renaming it to `actualJobId` makes it easier to identify which one
is the expected one
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
* A variant of the HighAvailabilityServices for testing. Each individual
service can be set to an
* arbitrary implementation, such as a mock or default service.
*/
-public class TestingHighAvailabilityServices implements
HighAvailabilityServices {
-
- private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
- private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
- private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements
HighAvailabilityServices {
+
+ public static EmptyBuilder newBuilder() {
+ return new EmptyBuilder();
+ }
+
+ public static EmptyBuilder newStandaloneBuilder() {
+ return new EmptyBuilder()
+ .setCheckpointRecoveryFactory(new
StandaloneCheckpointRecoveryFactory())
+ .setJobGraphStore(new StandaloneJobGraphStore())
+ .setDispatcherLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setResourceManagerLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setClusterRestEndpointLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setDispatcherLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setResourceManagerLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setClusterRestEndpointLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setJobMasterLeaderRetrieverFunction(
+ jobId ->
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setJobMasterLeaderElectionServiceFunction(
+ jobId -> new StandaloneLeaderElectionService());
+ }
+
+ public static EmbeddedBuilder newEmbeddedBuilder(Executor executor) {
+ return new EmbeddedBuilder(executor);
+ }
+
+ public abstract static class Builder<T extends Builder<T>> {
+
+ protected @Nullable CheckpointRecoveryFactory
checkpointRecoveryFactory;
+ protected @Nullable JobGraphStore jobGraphStore;
+ protected JobResultStore jobResultStore = new EmbeddedJobResultStore();
Review comment:
I think we shouldn't initialize the `JobResultStore` here with a default
value. We should have dedicated locations for the initialization of all these
components. Either it's in the `Builder` implementation (like we do it right
now for the `JobResultStore` or in the `newStandaloneBuilder()` factory method
(like we do it for the `CheckpointRecoveryFactory` and the `JobGraphStore` (I'd
prefer that one). That leaves the reader with a single location to check for
default behavior.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -74,7 +75,7 @@
@Before
public void setUp() throws Exception {
super.setUp();
- haServices.setCheckpointRecoveryFactory(
+ haServicesBuilder.setCheckpointRecoveryFactory(
new
PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>(
(maxCheckpoints, previous, sharedStateRegistryFactory,
ioExecutor) -> {
if (previous != null) {
Review comment:
I guess, that function can be refactored similar to the one in
`EmbeddedHaServicesWithLeadershipControl` 😇
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
* A variant of the HighAvailabilityServices for testing. Each individual
service can be set to an
* arbitrary implementation, such as a mock or default service.
*/
-public class TestingHighAvailabilityServices implements
HighAvailabilityServices {
-
- private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
- private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
- private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements
HighAvailabilityServices {
+
+ public static EmptyBuilder newBuilder() {
+ return new EmptyBuilder();
+ }
+
+ public static EmptyBuilder newStandaloneBuilder() {
+ return new EmptyBuilder()
+ .setCheckpointRecoveryFactory(new
StandaloneCheckpointRecoveryFactory())
+ .setJobGraphStore(new StandaloneJobGraphStore())
+ .setDispatcherLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setResourceManagerLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setClusterRestEndpointLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setDispatcherLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setResourceManagerLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setClusterRestEndpointLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setJobMasterLeaderRetrieverFunction(
+ jobId ->
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setJobMasterLeaderElectionServiceFunction(
+ jobId -> new StandaloneLeaderElectionService());
+ }
+
+ public static EmbeddedBuilder newEmbeddedBuilder(Executor executor) {
+ return new EmbeddedBuilder(executor);
+ }
+
+ public abstract static class Builder<T extends Builder<T>> {
+
+ protected @Nullable CheckpointRecoveryFactory
checkpointRecoveryFactory;
+ protected @Nullable JobGraphStore jobGraphStore;
+ protected JobResultStore jobResultStore = new EmbeddedJobResultStore();
+ protected CompletableFuture<Void> closeFuture = new
CompletableFuture<>();
+ protected CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ protected CompletableFuture<JobID> globalCleanupFuture = new
CompletableFuture<>();
+
+ private Builder() {}
+
+ public T setCheckpointRecoveryFactory(CheckpointRecoveryFactory
checkpointRecoveryFactory) {
+ this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction =
- ignored -> null;
+ public T setJobGraphStore(JobGraphStore jobGraphStore) {
+ this.jobGraphStore = jobGraphStore;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction =
- ignored -> null;
+ public T setJobResultStore(JobResultStore jobResultStore) {
+ this.jobResultStore = jobResultStore;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private ConcurrentHashMap<JobID, LeaderRetrievalService>
jobMasterLeaderRetrievers =
- new ConcurrentHashMap<>();
+ public T setCloseFuture(CompletableFuture<Void> closeFuture) {
+ this.closeFuture = closeFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private ConcurrentHashMap<JobID, LeaderElectionService>
jobManagerLeaderElectionServices =
- new ConcurrentHashMap<>();
+ public T setCloseAndCleanupAllDataFuture(
+ CompletableFuture<Void> closeAndCleanupAllDataFuture) {
+ this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile LeaderElectionService
resourceManagerLeaderElectionService;
+ public T setGlobalCleanupFuture(CompletableFuture<JobID>
globalCleanupFuture) {
+ this.globalCleanupFuture = globalCleanupFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile LeaderElectionService dispatcherLeaderElectionService;
+ public abstract TestingHighAvailabilityServices build();
+ }
- private volatile LeaderElectionService
clusterRestEndpointLeaderElectionService;
+ public static class EmbeddedBuilder extends Builder<EmbeddedBuilder> {
- private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
+ private final Executor executor;
- private volatile JobGraphStore jobGraphStore;
+ private EmbeddedBuilder(Executor executor) {
+ this.executor = executor;
+ }
- private volatile JobResultStore jobResultStore = new
EmbeddedJobResultStore();
+ @Override
+ public TestingHighAvailabilityServices build() {
+ try {
+ return new EmbeddedLeaderElection(
+ checkpointRecoveryFactory,
+ jobGraphStore,
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture,
+ new EmbeddedHaServicesWithLeadershipControl(executor));
+ } catch (Exception e) {
+ throw new IllegalStateException("Error building embedded
services.", e);
+ }
+ }
+ }
- private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ public static class EmptyBuilder extends Builder<EmptyBuilder> {
+
+ private LeaderRetrievalService dispatcherLeaderRetriever;
+ private LeaderRetrievalService resourceManagerLeaderRetriever;
+ private LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+
+ private LeaderElectionService resourceManagerLeaderElectionService;
+ private LeaderElectionService dispatcherLeaderElectionService;
+ private LeaderElectionService clusterRestEndpointLeaderElectionService;
+
+ private Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction =
+ ignored -> null;
+ private Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction =
+ ignored -> null;
+
+ private EmptyBuilder() {}
+
+ public TestingHighAvailabilityServices build() {
+ return new ManualLeaderElection(
+ checkpointRecoveryFactory,
+ jobGraphStore,
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture,
+ dispatcherLeaderRetriever,
+ resourceManagerLeaderRetriever,
+ clusterRestEndpointLeaderRetriever,
+ dispatcherLeaderElectionService,
+ resourceManagerLeaderElectionService,
+ clusterRestEndpointLeaderElectionService,
+ jobMasterLeaderRetrieverFunction,
+ jobMasterLeaderElectionServiceFunction);
+ }
- private CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ public EmptyBuilder setResourceManagerLeaderRetriever(
+ LeaderRetrievalService resourceManagerLeaderRetriever) {
+ this.resourceManagerLeaderRetriever =
resourceManagerLeaderRetriever;
+ return this;
+ }
- private volatile CompletableFuture<JobID> globalCleanupFuture;
+ public EmptyBuilder setDispatcherLeaderRetriever(
+ LeaderRetrievalService dispatcherLeaderRetriever) {
+ this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+ return this;
+ }
- // ------------------------------------------------------------------------
- // Setters for mock / testing implementations
- // ------------------------------------------------------------------------
+ public EmptyBuilder setClusterRestEndpointLeaderRetriever(
+ final LeaderRetrievalService
clusterRestEndpointLeaderRetriever) {
+ this.clusterRestEndpointLeaderRetriever =
clusterRestEndpointLeaderRetriever;
+ return this;
+ }
- public void setResourceManagerLeaderRetriever(
- LeaderRetrievalService resourceManagerLeaderRetriever) {
- this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
- }
+ public EmptyBuilder setResourceManagerLeaderElectionService(
+ LeaderElectionService leaderElectionService) {
+ this.resourceManagerLeaderElectionService = leaderElectionService;
+ return this;
+ }
- public void setDispatcherLeaderRetriever(LeaderRetrievalService
dispatcherLeaderRetriever) {
- this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
- }
+ public EmptyBuilder setDispatcherLeaderElectionService(
+ LeaderElectionService leaderElectionService) {
+ this.dispatcherLeaderElectionService = leaderElectionService;
+ return this;
+ }
- public void setClusterRestEndpointLeaderRetriever(
- final LeaderRetrievalService clusterRestEndpointLeaderRetriever) {
- this.clusterRestEndpointLeaderRetriever =
clusterRestEndpointLeaderRetriever;
- }
+ public EmptyBuilder setClusterRestEndpointLeaderElectionService(
+ final LeaderElectionService
clusterRestEndpointLeaderElectionService) {
+ this.clusterRestEndpointLeaderElectionService =
+ clusterRestEndpointLeaderElectionService;
+ return this;
+ }
- public void setJobMasterLeaderRetriever(
- JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever) {
- this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
- }
+ public EmptyBuilder setJobMasterLeaderElectionServiceFunction(
+ Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction) {
+ this.jobMasterLeaderElectionServiceFunction =
jobMasterLeaderElectionServiceFunction;
+ return this;
+ }
- public void setJobMasterLeaderElectionService(
- JobID jobID, LeaderElectionService leaderElectionService) {
- this.jobManagerLeaderElectionServices.put(jobID,
leaderElectionService);
+ public EmptyBuilder setJobMasterLeaderRetrieverFunction(
+ Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction) {
+ this.jobMasterLeaderRetrieverFunction =
jobMasterLeaderRetrieverFunction;
+ return this;
+ }
}
- public void setResourceManagerLeaderElectionService(
- LeaderElectionService leaderElectionService) {
- this.resourceManagerLeaderElectionService = leaderElectionService;
- }
+ public static class EmbeddedLeaderElection extends
TestingHighAvailabilityServices {
+
+ private final EmbeddedHaServicesWithLeadershipControl embeddedServices;
+
+ private EmbeddedLeaderElection(
+ @Nullable CheckpointRecoveryFactory checkpointRecoveryFactory,
+ @Nullable JobGraphStore jobGraphStore,
+ JobResultStore jobResultStore,
+ CompletableFuture<Void> closeFuture,
+ CompletableFuture<Void> closeAndCleanupAllDataFuture,
+ CompletableFuture<JobID> globalCleanupFuture,
+ EmbeddedHaServicesWithLeadershipControl embeddedServices)
+ throws Exception {
+ super(
+ MoreObjects.firstNonNull(
+ checkpointRecoveryFactory,
+ embeddedServices.getCheckpointRecoveryFactory()),
+ MoreObjects.firstNonNull(jobGraphStore,
embeddedServices.getJobGraphStore()),
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture);
+ this.embeddedServices = embeddedServices;
+ }
- public void setDispatcherLeaderElectionService(LeaderElectionService
leaderElectionService) {
- this.dispatcherLeaderElectionService = leaderElectionService;
- }
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ return embeddedServices.getResourceManagerLeaderRetriever();
+ }
- public void setClusterRestEndpointLeaderElectionService(
- final LeaderElectionService
clusterRestEndpointLeaderElectionService) {
- this.clusterRestEndpointLeaderElectionService =
clusterRestEndpointLeaderElectionService;
- }
+ @Override
+ public LeaderRetrievalService getDispatcherLeaderRetriever() {
+ return embeddedServices.getDispatcherLeaderRetriever();
+ }
- public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory
checkpointRecoveryFactory) {
- this.checkpointRecoveryFactory = checkpointRecoveryFactory;
- }
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID
jobId) {
+ return embeddedServices.getJobManagerLeaderRetriever(jobId);
+ }
- public void setJobGraphStore(JobGraphStore jobGraphStore) {
- this.jobGraphStore = jobGraphStore;
- }
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(
+ JobID jobId, String defaultJobManagerAddress) {
+ return embeddedServices.getJobManagerLeaderRetriever(jobId,
defaultJobManagerAddress);
+ }
- public void setJobResultStore(JobResultStore jobResultStore) {
- this.jobResultStore = jobResultStore;
- }
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService()
{
+ return embeddedServices.getResourceManagerLeaderElectionService();
+ }
- public void setJobMasterLeaderElectionServiceFunction(
- Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction) {
- this.jobMasterLeaderElectionServiceFunction =
jobMasterLeaderElectionServiceFunction;
- }
+ @Override
+ public LeaderElectionService getDispatcherLeaderElectionService() {
+ return embeddedServices.getResourceManagerLeaderElectionService();
+ }
- public void setJobMasterLeaderRetrieverFunction(
- Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction) {
- this.jobMasterLeaderRetrieverFunction =
jobMasterLeaderRetrieverFunction;
- }
+ @Override
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID
jobID) {
+ return embeddedServices.getJobManagerLeaderElectionService(jobID);
+ }
- public void setCloseFuture(CompletableFuture<Void> closeFuture) {
- this.closeFuture = closeFuture;
- }
+ @Override
+ public LeaderElectionService
getClusterRestEndpointLeaderElectionService() {
+ return
embeddedServices.getClusterRestEndpointLeaderElectionService();
+ }
- public void setCloseAndCleanupAllDataFuture(
- CompletableFuture<Void> closeAndCleanupAllDataFuture) {
- this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
- }
+ @Override
+ public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
+ return embeddedServices.getClusterRestEndpointLeaderRetriever();
+ }
- public void setGlobalCleanupFuture(CompletableFuture<JobID>
globalCleanupFuture) {
- this.globalCleanupFuture = globalCleanupFuture;
+ @Override
+ public HaLeadershipControl getLeadershipControl() {
+ return embeddedServices;
+ }
}
- // ------------------------------------------------------------------------
- // HA Services Methods
- // ------------------------------------------------------------------------
+ public static class ManualLeaderElection extends
TestingHighAvailabilityServices {
+
+ @Nullable private final LeaderRetrievalService
dispatcherLeaderRetriever;
+ @Nullable private final LeaderRetrievalService
resourceManagerLeaderRetriever;
+ @Nullable private final LeaderRetrievalService
clusterRestEndpointLeaderRetriever;
+ @Nullable private final LeaderElectionService
dispatcherLeaderElectionService;
+ @Nullable private final LeaderElectionService
resourceManagerLeaderElectionService;
+ @Nullable private final LeaderElectionService
clusterRestEndpointLeaderElectionService;
+
+ private final ConcurrentHashMap<JobID, LeaderRetrievalService>
jobMasterLeaderRetrievers =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<JobID, LeaderElectionService>
+ jobManagerLeaderElectionServices = new ConcurrentHashMap<>();
+
+ private final Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction;
+ private final Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction;
+
+ private ManualLeaderElection(
+ @Nullable CheckpointRecoveryFactory checkpointRecoveryFactory,
+ @Nullable JobGraphStore jobGraphStore,
+ JobResultStore jobResultStore,
+ CompletableFuture<Void> closeFuture,
+ CompletableFuture<Void> closeAndCleanupAllDataFuture,
+ CompletableFuture<JobID> globalCleanupFuture,
+ @Nullable LeaderRetrievalService dispatcherLeaderRetriever,
+ @Nullable LeaderRetrievalService
resourceManagerLeaderRetriever,
+ @Nullable LeaderRetrievalService
clusterRestEndpointLeaderRetriever,
+ @Nullable LeaderElectionService
dispatcherLeaderElectionService,
+ @Nullable LeaderElectionService
resourceManagerLeaderElectionService,
+ @Nullable LeaderElectionService
clusterRestEndpointLeaderElectionService,
+ Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction,
+ Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction) {
+ super(
+ checkpointRecoveryFactory,
+ jobGraphStore,
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture);
+ this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+ this.resourceManagerLeaderRetriever =
resourceManagerLeaderRetriever;
+ this.clusterRestEndpointLeaderRetriever =
clusterRestEndpointLeaderRetriever;
+ this.dispatcherLeaderElectionService =
dispatcherLeaderElectionService;
+ this.resourceManagerLeaderElectionService =
resourceManagerLeaderElectionService;
+ this.clusterRestEndpointLeaderElectionService =
+ clusterRestEndpointLeaderElectionService;
+ this.jobMasterLeaderRetrieverFunction =
jobMasterLeaderRetrieverFunction;
+ this.jobMasterLeaderElectionServiceFunction =
jobMasterLeaderElectionServiceFunction;
+ }
- @Override
- public LeaderRetrievalService getResourceManagerLeaderRetriever() {
- LeaderRetrievalService service = this.resourceManagerLeaderRetriever;
- if (service != null) {
- return service;
- } else {
- throw new IllegalStateException("ResourceManagerLeaderRetriever
has not been set");
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ if (resourceManagerLeaderRetriever == null) {
Review comment:
What's the purpose of all the null checks being done in the method
implementations? Can't we do this in the constructor?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -291,7 +530,8 @@ public void closeAndCleanupAllData() throws Exception {
if (globalCleanupFuture != null) {
Review comment:
`globalCleanupFuture` is actually not nullable anymore
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
##########
@@ -182,14 +183,10 @@ public void testDirtyJobResultRecoveryInApplicationMode()
throws Exception {
new JobResultEntry(
TestingJobResultStore.createSuccessfulJobResult(
ApplicationDispatcherBootstrap.ZERO_JOB_ID)));
- final EmbeddedHaServicesWithLeadershipControl haServices =
- new
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) {
-
- @Override
- public JobResultStore getJobResultStore() {
- return jobResultStore;
- }
- };
+ final TestingHighAvailabilityServices haServices =
Review comment:
It looks like we could replace it by the standalone version due to
leader election functionality not being used... That's my bad - I guess I
didn't check that code path properly when creating the test
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -132,12 +133,16 @@ public void
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
})
.build();
jobGraphStore.start(null);
- haServices.setJobGraphStore(jobGraphStore);
+ haServicesBuilder.setJobGraphStore(jobGraphStore);
// Construct leader election service.
final TestingLeaderElectionService leaderElectionService =
new TestingLeaderElectionService();
- haServices.setJobMasterLeaderElectionService(jobId,
leaderElectionService);
+ haServicesBuilder.setJobMasterLeaderElectionServiceFunction(
+ innerJobId -> {
+ assertEquals(jobId, innerJobId);
Review comment:
I'm not going to mention the other occurrences in the review...
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
* A variant of the HighAvailabilityServices for testing. Each individual
service can be set to an
* arbitrary implementation, such as a mock or default service.
*/
-public class TestingHighAvailabilityServices implements
HighAvailabilityServices {
-
- private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
- private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
- private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements
HighAvailabilityServices {
Review comment:
After going through the usages of each of these `Builder`s, it's
essentially: `newBuilder` replaces `TestingHighAvailabilityServices`,
`newStandaloneBuilder` replaces `TestingHighAvailabilityServicesBuilder` and
`newEmbeddedBuilder` is a new implementation providing direct access to the
leader election functionality which is only used in
`ApplicationDispatcherBootstrapITCase` for now...
Considering that the change might be good enough but we should create a
follow-up task refactoring the structure into a easier-to-read construct. 🤔
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -291,7 +530,8 @@ public void closeAndCleanupAllData() throws Exception {
if (globalCleanupFuture != null) {
globalCleanupFuture.complete(jobID);
}
-
return FutureUtils.completedVoidFuture();
}
+
+ public abstract HaLeadershipControl getLeadershipControl();
Review comment:
As far as I can see, this method is only properly used by the
`EmbeddedLeaderElection` implementation. The `Builder` implementation could
provide another Generic Type that defines the return type of the `build()`
method. That's how we could make this method available only through the
`EmbeddedLeaderElection` returned by the `EmbeddedBuilder`
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
##########
@@ -182,14 +183,10 @@ public void testDirtyJobResultRecoveryInApplicationMode()
throws Exception {
new JobResultEntry(
TestingJobResultStore.createSuccessfulJobResult(
ApplicationDispatcherBootstrap.ZERO_JOB_ID)));
- final EmbeddedHaServicesWithLeadershipControl haServices =
- new
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) {
-
- @Override
- public JobResultStore getJobResultStore() {
- return jobResultStore;
- }
- };
+ final TestingHighAvailabilityServices haServices =
Review comment:
Do we actually need the `EmbeddedHaServicesWithLeadershipControl` here?
Isn't it more about the `EmbeddedHaServicesWithLeadershipControl` in
`EmbeddedHaServicesWithLeadershipControl` that is of value in this test? We
don't use the leadership control at all in the test code... 🤔
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -132,12 +133,16 @@ public void
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
})
.build();
jobGraphStore.start(null);
- haServices.setJobGraphStore(jobGraphStore);
+ haServicesBuilder.setJobGraphStore(jobGraphStore);
// Construct leader election service.
final TestingLeaderElectionService leaderElectionService =
new TestingLeaderElectionService();
- haServices.setJobMasterLeaderElectionService(jobId,
leaderElectionService);
+ haServicesBuilder.setJobMasterLeaderElectionServiceFunction(
+ innerJobId -> {
+ assertEquals(jobId, innerJobId);
Review comment:
nit: Sticking to the test-related wording, the `innerJobId` could be
named `actualJobId`
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -817,12 +830,16 @@ public void testJobSuspensionWhenDispatcherIsTerminated()
throws Exception {
@Test
public void testJobStatusIsShownDuringTermination() throws Exception {
final JobID blockingId = new JobID();
Review comment:
nit: Not sure whether we want to do that as part of this PR. But moving
the `blockedJobGraph` creation into the top of this test would help us getting
rid of `blockingId` and use `blockedJobGraph.getJobID()` instead. That would
make it clearer where this jobID is coming from...
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapITCase.java
##########
@@ -182,14 +183,10 @@ public void testDirtyJobResultRecoveryInApplicationMode()
throws Exception {
new JobResultEntry(
TestingJobResultStore.createSuccessfulJobResult(
ApplicationDispatcherBootstrap.ZERO_JOB_ID)));
- final EmbeddedHaServicesWithLeadershipControl haServices =
- new
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()) {
-
- @Override
- public JobResultStore getJobResultStore() {
- return jobResultStore;
- }
- };
+ final TestingHighAvailabilityServices haServices =
Review comment:
Looks like it's the only occurrence of `EmbeddedLeaderElection`. So, we
could remove it as well
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
* A variant of the HighAvailabilityServices for testing. Each individual
service can be set to an
* arbitrary implementation, such as a mock or default service.
*/
-public class TestingHighAvailabilityServices implements
HighAvailabilityServices {
-
- private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
- private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
- private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements
HighAvailabilityServices {
Review comment:
I'm wondering whether the complexity of the `Testing*` implementation
comes from the fact that we're combining two interfaces into one. We have the
`HAServices` like `JobGraphStore`, `JobResultStore` and
`CheckpointRecoveryFactory` which provides the information for jobs to recover
and the "`LeadershipServices`" that are responsible for leader election. Should
we create a follow-up ticket for splitting this up to improve maintainability
of the code?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
##########
@@ -38,228 +48,457 @@
* A variant of the HighAvailabilityServices for testing. Each individual
service can be set to an
* arbitrary implementation, such as a mock or default service.
*/
-public class TestingHighAvailabilityServices implements
HighAvailabilityServices {
-
- private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
-
- private volatile LeaderRetrievalService dispatcherLeaderRetriever;
-
- private volatile LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+public abstract class TestingHighAvailabilityServices implements
HighAvailabilityServices {
+
+ public static EmptyBuilder newBuilder() {
+ return new EmptyBuilder();
+ }
+
+ public static EmptyBuilder newStandaloneBuilder() {
+ return new EmptyBuilder()
+ .setCheckpointRecoveryFactory(new
StandaloneCheckpointRecoveryFactory())
+ .setJobGraphStore(new StandaloneJobGraphStore())
+ .setDispatcherLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setResourceManagerLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setClusterRestEndpointLeaderRetriever(
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setDispatcherLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setResourceManagerLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setClusterRestEndpointLeaderElectionService(new
StandaloneLeaderElectionService())
+ .setJobMasterLeaderRetrieverFunction(
+ jobId ->
+ new StandaloneLeaderRetrievalService(
+ "localhost",
HighAvailabilityServices.DEFAULT_LEADER_ID))
+ .setJobMasterLeaderElectionServiceFunction(
+ jobId -> new StandaloneLeaderElectionService());
+ }
+
+ public static EmbeddedBuilder newEmbeddedBuilder(Executor executor) {
+ return new EmbeddedBuilder(executor);
+ }
+
+ public abstract static class Builder<T extends Builder<T>> {
+
+ protected @Nullable CheckpointRecoveryFactory
checkpointRecoveryFactory;
+ protected @Nullable JobGraphStore jobGraphStore;
+ protected JobResultStore jobResultStore = new EmbeddedJobResultStore();
+ protected CompletableFuture<Void> closeFuture = new
CompletableFuture<>();
+ protected CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ protected CompletableFuture<JobID> globalCleanupFuture = new
CompletableFuture<>();
+
+ private Builder() {}
+
+ public T setCheckpointRecoveryFactory(CheckpointRecoveryFactory
checkpointRecoveryFactory) {
+ this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction =
- ignored -> null;
+ public T setJobGraphStore(JobGraphStore jobGraphStore) {
+ this.jobGraphStore = jobGraphStore;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction =
- ignored -> null;
+ public T setJobResultStore(JobResultStore jobResultStore) {
+ this.jobResultStore = jobResultStore;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private ConcurrentHashMap<JobID, LeaderRetrievalService>
jobMasterLeaderRetrievers =
- new ConcurrentHashMap<>();
+ public T setCloseFuture(CompletableFuture<Void> closeFuture) {
+ this.closeFuture = closeFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private ConcurrentHashMap<JobID, LeaderElectionService>
jobManagerLeaderElectionServices =
- new ConcurrentHashMap<>();
+ public T setCloseAndCleanupAllDataFuture(
+ CompletableFuture<Void> closeAndCleanupAllDataFuture) {
+ this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile LeaderElectionService
resourceManagerLeaderElectionService;
+ public T setGlobalCleanupFuture(CompletableFuture<JobID>
globalCleanupFuture) {
+ this.globalCleanupFuture = globalCleanupFuture;
+ @SuppressWarnings("unchecked")
+ final T cast = (T) this;
+ return cast;
+ }
- private volatile LeaderElectionService dispatcherLeaderElectionService;
+ public abstract TestingHighAvailabilityServices build();
+ }
- private volatile LeaderElectionService
clusterRestEndpointLeaderElectionService;
+ public static class EmbeddedBuilder extends Builder<EmbeddedBuilder> {
- private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
+ private final Executor executor;
- private volatile JobGraphStore jobGraphStore;
+ private EmbeddedBuilder(Executor executor) {
+ this.executor = executor;
+ }
- private volatile JobResultStore jobResultStore = new
EmbeddedJobResultStore();
+ @Override
+ public TestingHighAvailabilityServices build() {
+ try {
+ return new EmbeddedLeaderElection(
+ checkpointRecoveryFactory,
+ jobGraphStore,
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture,
+ new EmbeddedHaServicesWithLeadershipControl(executor));
+ } catch (Exception e) {
+ throw new IllegalStateException("Error building embedded
services.", e);
+ }
+ }
+ }
- private CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ public static class EmptyBuilder extends Builder<EmptyBuilder> {
+
+ private LeaderRetrievalService dispatcherLeaderRetriever;
+ private LeaderRetrievalService resourceManagerLeaderRetriever;
+ private LeaderRetrievalService clusterRestEndpointLeaderRetriever;
+
+ private LeaderElectionService resourceManagerLeaderElectionService;
+ private LeaderElectionService dispatcherLeaderElectionService;
+ private LeaderElectionService clusterRestEndpointLeaderElectionService;
+
+ private Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction =
+ ignored -> null;
+ private Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction =
+ ignored -> null;
+
+ private EmptyBuilder() {}
+
+ public TestingHighAvailabilityServices build() {
+ return new ManualLeaderElection(
+ checkpointRecoveryFactory,
+ jobGraphStore,
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture,
+ dispatcherLeaderRetriever,
+ resourceManagerLeaderRetriever,
+ clusterRestEndpointLeaderRetriever,
+ dispatcherLeaderElectionService,
+ resourceManagerLeaderElectionService,
+ clusterRestEndpointLeaderElectionService,
+ jobMasterLeaderRetrieverFunction,
+ jobMasterLeaderElectionServiceFunction);
+ }
- private CompletableFuture<Void> closeAndCleanupAllDataFuture = new
CompletableFuture<>();
+ public EmptyBuilder setResourceManagerLeaderRetriever(
+ LeaderRetrievalService resourceManagerLeaderRetriever) {
+ this.resourceManagerLeaderRetriever =
resourceManagerLeaderRetriever;
+ return this;
+ }
- private volatile CompletableFuture<JobID> globalCleanupFuture;
+ public EmptyBuilder setDispatcherLeaderRetriever(
+ LeaderRetrievalService dispatcherLeaderRetriever) {
+ this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+ return this;
+ }
- // ------------------------------------------------------------------------
- // Setters for mock / testing implementations
- // ------------------------------------------------------------------------
+ public EmptyBuilder setClusterRestEndpointLeaderRetriever(
+ final LeaderRetrievalService
clusterRestEndpointLeaderRetriever) {
+ this.clusterRestEndpointLeaderRetriever =
clusterRestEndpointLeaderRetriever;
+ return this;
+ }
- public void setResourceManagerLeaderRetriever(
- LeaderRetrievalService resourceManagerLeaderRetriever) {
- this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever;
- }
+ public EmptyBuilder setResourceManagerLeaderElectionService(
+ LeaderElectionService leaderElectionService) {
+ this.resourceManagerLeaderElectionService = leaderElectionService;
+ return this;
+ }
- public void setDispatcherLeaderRetriever(LeaderRetrievalService
dispatcherLeaderRetriever) {
- this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
- }
+ public EmptyBuilder setDispatcherLeaderElectionService(
+ LeaderElectionService leaderElectionService) {
+ this.dispatcherLeaderElectionService = leaderElectionService;
+ return this;
+ }
- public void setClusterRestEndpointLeaderRetriever(
- final LeaderRetrievalService clusterRestEndpointLeaderRetriever) {
- this.clusterRestEndpointLeaderRetriever =
clusterRestEndpointLeaderRetriever;
- }
+ public EmptyBuilder setClusterRestEndpointLeaderElectionService(
+ final LeaderElectionService
clusterRestEndpointLeaderElectionService) {
+ this.clusterRestEndpointLeaderElectionService =
+ clusterRestEndpointLeaderElectionService;
+ return this;
+ }
- public void setJobMasterLeaderRetriever(
- JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever) {
- this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever);
- }
+ public EmptyBuilder setJobMasterLeaderElectionServiceFunction(
+ Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction) {
+ this.jobMasterLeaderElectionServiceFunction =
jobMasterLeaderElectionServiceFunction;
+ return this;
+ }
- public void setJobMasterLeaderElectionService(
- JobID jobID, LeaderElectionService leaderElectionService) {
- this.jobManagerLeaderElectionServices.put(jobID,
leaderElectionService);
+ public EmptyBuilder setJobMasterLeaderRetrieverFunction(
+ Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction) {
+ this.jobMasterLeaderRetrieverFunction =
jobMasterLeaderRetrieverFunction;
+ return this;
+ }
}
- public void setResourceManagerLeaderElectionService(
- LeaderElectionService leaderElectionService) {
- this.resourceManagerLeaderElectionService = leaderElectionService;
- }
+ public static class EmbeddedLeaderElection extends
TestingHighAvailabilityServices {
+
+ private final EmbeddedHaServicesWithLeadershipControl embeddedServices;
+
+ private EmbeddedLeaderElection(
+ @Nullable CheckpointRecoveryFactory checkpointRecoveryFactory,
+ @Nullable JobGraphStore jobGraphStore,
+ JobResultStore jobResultStore,
+ CompletableFuture<Void> closeFuture,
+ CompletableFuture<Void> closeAndCleanupAllDataFuture,
+ CompletableFuture<JobID> globalCleanupFuture,
+ EmbeddedHaServicesWithLeadershipControl embeddedServices)
+ throws Exception {
+ super(
+ MoreObjects.firstNonNull(
+ checkpointRecoveryFactory,
+ embeddedServices.getCheckpointRecoveryFactory()),
+ MoreObjects.firstNonNull(jobGraphStore,
embeddedServices.getJobGraphStore()),
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture);
+ this.embeddedServices = embeddedServices;
+ }
- public void setDispatcherLeaderElectionService(LeaderElectionService
leaderElectionService) {
- this.dispatcherLeaderElectionService = leaderElectionService;
- }
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ return embeddedServices.getResourceManagerLeaderRetriever();
+ }
- public void setClusterRestEndpointLeaderElectionService(
- final LeaderElectionService
clusterRestEndpointLeaderElectionService) {
- this.clusterRestEndpointLeaderElectionService =
clusterRestEndpointLeaderElectionService;
- }
+ @Override
+ public LeaderRetrievalService getDispatcherLeaderRetriever() {
+ return embeddedServices.getDispatcherLeaderRetriever();
+ }
- public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory
checkpointRecoveryFactory) {
- this.checkpointRecoveryFactory = checkpointRecoveryFactory;
- }
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID
jobId) {
+ return embeddedServices.getJobManagerLeaderRetriever(jobId);
+ }
- public void setJobGraphStore(JobGraphStore jobGraphStore) {
- this.jobGraphStore = jobGraphStore;
- }
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(
+ JobID jobId, String defaultJobManagerAddress) {
+ return embeddedServices.getJobManagerLeaderRetriever(jobId,
defaultJobManagerAddress);
+ }
- public void setJobResultStore(JobResultStore jobResultStore) {
- this.jobResultStore = jobResultStore;
- }
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService()
{
+ return embeddedServices.getResourceManagerLeaderElectionService();
+ }
- public void setJobMasterLeaderElectionServiceFunction(
- Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction) {
- this.jobMasterLeaderElectionServiceFunction =
jobMasterLeaderElectionServiceFunction;
- }
+ @Override
+ public LeaderElectionService getDispatcherLeaderElectionService() {
+ return embeddedServices.getResourceManagerLeaderElectionService();
+ }
- public void setJobMasterLeaderRetrieverFunction(
- Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction) {
- this.jobMasterLeaderRetrieverFunction =
jobMasterLeaderRetrieverFunction;
- }
+ @Override
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID
jobID) {
+ return embeddedServices.getJobManagerLeaderElectionService(jobID);
+ }
- public void setCloseFuture(CompletableFuture<Void> closeFuture) {
- this.closeFuture = closeFuture;
- }
+ @Override
+ public LeaderElectionService
getClusterRestEndpointLeaderElectionService() {
+ return
embeddedServices.getClusterRestEndpointLeaderElectionService();
+ }
- public void setCloseAndCleanupAllDataFuture(
- CompletableFuture<Void> closeAndCleanupAllDataFuture) {
- this.closeAndCleanupAllDataFuture = closeAndCleanupAllDataFuture;
- }
+ @Override
+ public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
+ return embeddedServices.getClusterRestEndpointLeaderRetriever();
+ }
- public void setGlobalCleanupFuture(CompletableFuture<JobID>
globalCleanupFuture) {
- this.globalCleanupFuture = globalCleanupFuture;
+ @Override
+ public HaLeadershipControl getLeadershipControl() {
+ return embeddedServices;
+ }
}
- // ------------------------------------------------------------------------
- // HA Services Methods
- // ------------------------------------------------------------------------
+ public static class ManualLeaderElection extends
TestingHighAvailabilityServices {
+
+ @Nullable private final LeaderRetrievalService
dispatcherLeaderRetriever;
+ @Nullable private final LeaderRetrievalService
resourceManagerLeaderRetriever;
+ @Nullable private final LeaderRetrievalService
clusterRestEndpointLeaderRetriever;
+ @Nullable private final LeaderElectionService
dispatcherLeaderElectionService;
+ @Nullable private final LeaderElectionService
resourceManagerLeaderElectionService;
+ @Nullable private final LeaderElectionService
clusterRestEndpointLeaderElectionService;
+
+ private final ConcurrentHashMap<JobID, LeaderRetrievalService>
jobMasterLeaderRetrievers =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<JobID, LeaderElectionService>
+ jobManagerLeaderElectionServices = new ConcurrentHashMap<>();
+
+ private final Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction;
+ private final Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction;
+
+ private ManualLeaderElection(
+ @Nullable CheckpointRecoveryFactory checkpointRecoveryFactory,
+ @Nullable JobGraphStore jobGraphStore,
+ JobResultStore jobResultStore,
+ CompletableFuture<Void> closeFuture,
+ CompletableFuture<Void> closeAndCleanupAllDataFuture,
+ CompletableFuture<JobID> globalCleanupFuture,
+ @Nullable LeaderRetrievalService dispatcherLeaderRetriever,
+ @Nullable LeaderRetrievalService
resourceManagerLeaderRetriever,
+ @Nullable LeaderRetrievalService
clusterRestEndpointLeaderRetriever,
+ @Nullable LeaderElectionService
dispatcherLeaderElectionService,
+ @Nullable LeaderElectionService
resourceManagerLeaderElectionService,
+ @Nullable LeaderElectionService
clusterRestEndpointLeaderElectionService,
+ Function<JobID, LeaderRetrievalService>
jobMasterLeaderRetrieverFunction,
+ Function<JobID, LeaderElectionService>
jobMasterLeaderElectionServiceFunction) {
+ super(
+ checkpointRecoveryFactory,
+ jobGraphStore,
+ jobResultStore,
+ closeFuture,
+ closeAndCleanupAllDataFuture,
+ globalCleanupFuture);
+ this.dispatcherLeaderRetriever = dispatcherLeaderRetriever;
+ this.resourceManagerLeaderRetriever =
resourceManagerLeaderRetriever;
+ this.clusterRestEndpointLeaderRetriever =
clusterRestEndpointLeaderRetriever;
+ this.dispatcherLeaderElectionService =
dispatcherLeaderElectionService;
+ this.resourceManagerLeaderElectionService =
resourceManagerLeaderElectionService;
+ this.clusterRestEndpointLeaderElectionService =
+ clusterRestEndpointLeaderElectionService;
+ this.jobMasterLeaderRetrieverFunction =
jobMasterLeaderRetrieverFunction;
+ this.jobMasterLeaderElectionServiceFunction =
jobMasterLeaderElectionServiceFunction;
+ }
- @Override
- public LeaderRetrievalService getResourceManagerLeaderRetriever() {
- LeaderRetrievalService service = this.resourceManagerLeaderRetriever;
- if (service != null) {
- return service;
- } else {
- throw new IllegalStateException("ResourceManagerLeaderRetriever
has not been set");
+ @Override
+ public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+ if (resourceManagerLeaderRetriever == null) {
+ throw new
IllegalStateException("ResourceManagerLeaderRetriever has not been set");
+ }
+ return resourceManagerLeaderRetriever;
}
- }
- @Override
- public LeaderRetrievalService getDispatcherLeaderRetriever() {
- LeaderRetrievalService service = this.dispatcherLeaderRetriever;
- if (service != null) {
- return service;
- } else {
- throw new IllegalStateException("ResourceManagerLeaderRetriever
has not been set");
+ @Override
+ public LeaderRetrievalService getDispatcherLeaderRetriever() {
+ if (dispatcherLeaderRetriever == null) {
+ throw new
IllegalStateException("ResourceManagerLeaderRetriever has not been set");
+ }
+ return dispatcherLeaderRetriever;
}
- }
- @Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
- LeaderRetrievalService service =
- jobMasterLeaderRetrievers.computeIfAbsent(jobID,
jobMasterLeaderRetrieverFunction);
- if (service != null) {
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(JobID
jobID) {
+ @Nullable
+ final LeaderRetrievalService service =
+ jobMasterLeaderRetrievers.computeIfAbsent(
+ jobID, jobMasterLeaderRetrieverFunction);
+ if (service == null) {
+ throw new IllegalStateException("JobMasterLeaderRetriever has
not been set");
+ }
return service;
- } else {
- throw new IllegalStateException("JobMasterLeaderRetriever has not
been set");
}
- }
- @Override
- public LeaderRetrievalService getJobManagerLeaderRetriever(
- JobID jobID, String defaultJobManagerAddress) {
- return getJobManagerLeaderRetriever(jobID);
- }
+ @Override
+ public LeaderRetrievalService getJobManagerLeaderRetriever(
+ JobID jobID, String defaultJobManagerAddress) {
+ return getJobManagerLeaderRetriever(jobID);
+ }
- @Override
- public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
- return clusterRestEndpointLeaderRetriever;
- }
+ @Override
+ public LeaderElectionService getResourceManagerLeaderElectionService()
{
+ if (resourceManagerLeaderElectionService == null) {
+ throw new IllegalStateException(
+ "ResourceManagerLeaderElectionService has not been
set");
+ }
+ return resourceManagerLeaderElectionService;
+ }
- @Override
- public LeaderElectionService getResourceManagerLeaderElectionService() {
- LeaderElectionService service = resourceManagerLeaderElectionService;
+ @Override
+ public LeaderElectionService getDispatcherLeaderElectionService() {
+ if (dispatcherLeaderElectionService == null) {
+ throw new
IllegalStateException("DispatcherLeaderElectionService has not been set");
+ }
+ return dispatcherLeaderElectionService;
+ }
- if (service != null) {
+ @Override
+ public LeaderElectionService getJobManagerLeaderElectionService(JobID
jobID) {
+ @Nullable
+ final LeaderElectionService service =
+ jobManagerLeaderElectionServices.computeIfAbsent(
+ jobID, jobMasterLeaderElectionServiceFunction);
+ if (service == null) {
+ throw new
IllegalStateException("JobMasterLeaderElectionService has not been set");
+ }
return service;
- } else {
- throw new IllegalStateException(
- "ResourceManagerLeaderElectionService has not been set");
}
- }
-
- @Override
- public LeaderElectionService getDispatcherLeaderElectionService() {
- LeaderElectionService service = dispatcherLeaderElectionService;
- if (service != null) {
- return service;
- } else {
- throw new IllegalStateException("DispatcherLeaderElectionService
has not been set");
+ @Override
+ public LeaderElectionService
getClusterRestEndpointLeaderElectionService() {
+ if (clusterRestEndpointLeaderElectionService == null) {
+ throw new IllegalStateException(
+ "ClusterRestEndpointLeaderElectionService has not been
set");
+ }
+ return dispatcherLeaderElectionService;
}
- }
- @Override
- public LeaderElectionService getJobManagerLeaderElectionService(JobID
jobID) {
- LeaderElectionService service =
- jobManagerLeaderElectionServices.computeIfAbsent(
- jobID, jobMasterLeaderElectionServiceFunction);
+ @Override
+ public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
+ if (clusterRestEndpointLeaderRetriever == null) {
+ throw new IllegalStateException(
+ "ClusterRestEndpointLeaderRetriever has not been set");
+ }
+ return dispatcherLeaderRetriever;
+ }
- if (service != null) {
- return service;
- } else {
- throw new IllegalStateException("JobMasterLeaderElectionService
has not been set");
+ @Override
+ public HaLeadershipControl getLeadershipControl() {
+ throw new UnsupportedOperationException();
}
}
- @Override
- public LeaderElectionService getClusterRestEndpointLeaderElectionService()
{
- return clusterRestEndpointLeaderElectionService;
+ @Nullable protected final CheckpointRecoveryFactory
checkpointRecoveryFactory;
+ @Nullable private final JobGraphStore jobGraphStore;
+ private final JobResultStore jobResultStore;
+ private final CompletableFuture<Void> closeFuture;
+ private final CompletableFuture<Void> closeAndCleanupAllDataFuture;
+ private final CompletableFuture<JobID> globalCleanupFuture;
Review comment:
I guess you didn't add the `blobStore` here as a member because this PR
is more about uniting functionality and not adding new things?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
##########
@@ -114,9 +114,19 @@ private TaskSubmissionTestEnvironment(
ShuffleEnvironment<?, ?> shuffleEnvironment)
throws Exception {
- this.haServices = new TestingHighAvailabilityServices();
- this.haServices.setResourceManagerLeaderRetriever(new
SettableLeaderRetrievalService());
- this.haServices.setJobMasterLeaderRetriever(jobId, new
SettableLeaderRetrievalService());
+ this.haServices =
+ TestingHighAvailabilityServices.newBuilder()
+ .setResourceManagerLeaderRetriever(new
SettableLeaderRetrievalService())
+ .setJobMasterLeaderRetrieverFunction(
Review comment:
What about providing a utility method:
```
withJobMasterLeaderRetrievalService(JobID expectedJobId,
Supplier<LeaderRetrievalService> serviceSupplier) {
if (!jobId.equals(innerJobId)) {
throw new IllegalArgumentException(
String.format("Expected %s, got %s.", jobId, innerJobId));
}
return serviceSupplier.get();
}
```
...since it's used in quite a few places (with `assertEquals`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]