This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 1afe691  [FLINK-20468][minicluster] Enable leadership control in 
MiniCluster to test JM failover
1afe691 is described below

commit 1afe691ed51d32cd6dc04fafc322e2f540b186d8
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Thu Dec 3 11:37:14 2020 +0300

    [FLINK-20468][minicluster] Enable leadership control in MiniCluster to test 
JM failover
    
    Currently, there is no easy way to test how JM failover (revoke and grant 
leadership)
    affects other features with the MiniCluster and its testing resource rule.
    The custom HA services can be provided to the TestingMiniCluster but there 
is no simple HA services
    to support revoking and granting leadership with a valid in-memory 
checkpoint store.
    Providing a way to enable such embedded HA services for the MiniCluster out 
of the box allows to implement IT cases similar to E2E tests.
    
    This closes #14301.
---
 .../file/src/FileSourceTextLinesITCase.java        | 137 +++------------------
 .../EmbeddedCompletedCheckpointStore.java}         |  43 +++----
 .../PerJobCheckpointRecoveryFactory.java           |  68 ++++++++++
 .../EmbeddedHaServicesWithLeadershipControl.java}  |  28 ++++-
 .../nonha/embedded/HaLeadershipControl.java}       |  30 ++---
 .../flink/runtime/minicluster/MiniCluster.java     |  55 ++++++++-
 .../minicluster/MiniClusterConfiguration.java      |  20 ++-
 .../CheckpointCoordinatorRestoringTest.java        |   5 +-
 .../checkpoint/CheckpointCoordinatorTest.java      |   3 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |   9 +-
 .../LeaderChangeClusterComponentsTest.java         |   6 +-
 .../TestingMiniClusterConfiguration.java           |   2 +-
 .../runtime/testutils/MiniClusterResource.java     |   1 +
 .../MiniClusterResourceConfiguration.java          |  34 ++++-
 .../util/MiniClusterResourceConfiguration.java     |   3 +-
 .../NotifyCheckpointAbortedITCase.java             |   4 +-
 .../test/checkpointing/RegionFailoverITCase.java   |   4 +-
 17 files changed, 262 insertions(+), 190 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index 9075258..03dadbb 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -20,31 +20,19 @@ package org.apache.flink.connector.file.src;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
-import 
org.apache.flink.runtime.highavailability.nonha.embedded.TestingEmbeddedHaServices;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.minicluster.TestingMiniCluster;
-import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.FunctionWithException;
 
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -63,9 +51,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
 import java.util.zip.GZIPOutputStream;
 
 import static org.hamcrest.Matchers.equalTo;
@@ -82,54 +68,15 @@ public class FileSourceTextLinesITCase extends TestLogger {
        @ClassRule
        public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
 
-       private static TestingMiniCluster miniCluster;
-
-       private static TestingEmbeddedHaServices highAvailabilityServices;
-
-       private static CompletedCheckpointStore checkpointStore;
-
-       @BeforeClass
-       public static void setupMiniCluster() throws Exception  {
-               highAvailabilityServices = new 
HaServices(TestingUtils.defaultExecutor(),
-                       () -> checkpointStore,
-                       new StandaloneCheckpointIDCounter());
-
-               final Configuration configuration = createConfiguration();
-
-               miniCluster = new TestingMiniCluster(
-                       new TestingMiniClusterConfiguration.Builder()
-                               .setConfiguration(configuration)
-                               .setNumTaskManagers(1)
-                               .setNumSlotsPerTaskManager(PARALLELISM)
-                               
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
-                               .build(),
-                       () -> highAvailabilityServices);
-
-               miniCluster.start();
-       }
-
-       private static Configuration createConfiguration() throws IOException {
-               final Configuration configuration = new Configuration();
-               final String checkPointDir = 
Path.fromLocalFile(TMP_FOLDER.newFolder()).toUri().toString();
-               configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkPointDir);
-               return configuration;
-       }
-
-       @Before
-       public void setup() {
-               checkpointStore = new RecoverableCompletedCheckpointStore();
-       }
-
-       @AfterClass
-       public static void shutdownMiniCluster() throws Exception {
-               if (miniCluster != null) {
-                       miniCluster.close();
-               }
-               if (highAvailabilityServices != null) {
-                       highAvailabilityServices.closeAndCleanupAllData();
-                       highAvailabilityServices = null;
-               }
-       }
+       @ClassRule
+       public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE 
= new MiniClusterWithClientResource(
+               new MiniClusterResourceConfiguration
+                       .Builder()
+                       .setNumberTaskManagers(1)
+                       .setNumberSlotsPerTaskManager(PARALLELISM)
+                       .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                       .withHaLeadershipControl()
+                       .build());
 
        // 
------------------------------------------------------------------------
        //  test cases
@@ -174,7 +121,7 @@ public class FileSourceTextLinesITCase extends TestLogger {
                        .forRecordStreamFormat(new TextLineFormat(), 
Path.fromLocalFile(testDir))
                        .build();
 
-               final StreamExecutionEnvironment env = new 
TestStreamEnvironment(miniCluster, PARALLELISM);
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
 
                final DataStream<String> stream = env.fromSource(
@@ -236,7 +183,7 @@ public class FileSourceTextLinesITCase extends TestLogger {
                        .monitorContinuously(Duration.ofMillis(5))
                        .build();
 
-               final StreamExecutionEnvironment env = new 
TestStreamEnvironment(miniCluster, PARALLELISM);
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(PARALLELISM);
                env.enableCheckpointing(10L);
 
@@ -308,15 +255,17 @@ public class FileSourceTextLinesITCase extends TestLogger 
{
        }
 
        private static void triggerJobManagerFailover(JobID jobId, Runnable 
afterFailAction) throws Exception {
-               highAvailabilityServices.revokeJobMasterLeadership(jobId).get();
+               final HaLeadershipControl haLeadershipControl =
+                       
MINI_CLUSTER_RESOURCE.getMiniCluster().getHaLeadershipControl().get();
+               haLeadershipControl.revokeJobMasterLeadership(jobId).get();
                afterFailAction.run();
-               highAvailabilityServices.grantJobMasterLeadership(jobId).get();
+               haLeadershipControl.grantJobMasterLeadership(jobId).get();
        }
 
        private static void restartTaskManager(Runnable afterFailAction) throws 
Exception {
-               miniCluster.terminateTaskManager(0).get();
+               
MINI_CLUSTER_RESOURCE.getMiniCluster().terminateTaskManager(0).get();
                afterFailAction.run();
-               miniCluster.startTaskManager();
+               MINI_CLUSTER_RESOURCE.getMiniCluster().startTaskManager();
        }
 
        // 
------------------------------------------------------------------------
@@ -483,52 +432,6 @@ public class FileSourceTextLinesITCase extends TestLogger {
        //  mini cluster failover utilities
        // 
------------------------------------------------------------------------
 
-       private static class HaServices extends TestingEmbeddedHaServices {
-               private final Supplier<CompletedCheckpointStore> 
completedCheckpointStoreFactory;
-               private final CheckpointIDCounter checkpointIDCounter;
-
-               private HaServices(
-                               Executor executor,
-                               Supplier<CompletedCheckpointStore> 
completedCheckpointStoreFactory,
-                               CheckpointIDCounter checkpointIDCounter) {
-                       super(executor);
-                       this.completedCheckpointStoreFactory = 
completedCheckpointStoreFactory;
-                       this.checkpointIDCounter = checkpointIDCounter;
-               }
-
-               @Override
-               public CheckpointRecoveryFactory getCheckpointRecoveryFactory() 
{
-                       return new CheckpointRecoveryFactoryWithSettableStore(
-                               completedCheckpointStoreFactory,
-                               checkpointIDCounter);
-               }
-       }
-
-       private static class CheckpointRecoveryFactoryWithSettableStore 
implements CheckpointRecoveryFactory {
-               private final Supplier<CompletedCheckpointStore> 
completedCheckpointStoreFactory;
-               private final CheckpointIDCounter checkpointIDCounter;
-
-               private CheckpointRecoveryFactoryWithSettableStore(
-                               Supplier<CompletedCheckpointStore> 
completedCheckpointStoreFactory,
-                               CheckpointIDCounter checkpointIDCounter) {
-                       this.completedCheckpointStoreFactory = 
completedCheckpointStoreFactory;
-                       this.checkpointIDCounter = checkpointIDCounter;
-               }
-
-               @Override
-               public CompletedCheckpointStore createCheckpointStore(
-                               JobID jobId,
-                               int maxNumberOfCheckpointsToRetain,
-                               ClassLoader userClassLoader) {
-                       return completedCheckpointStoreFactory.get();
-               }
-
-               @Override
-               public CheckpointIDCounter createCheckpointIDCounter(JobID 
jobId) {
-                       return checkpointIDCounter;
-               }
-       }
-
        private static class RecordCounterToFail {
 
                private static AtomicInteger records;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
similarity index 64%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index ba2dec0..643761f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -16,53 +16,49 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.testutils;
+package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 /**
- * A checkpoint store, which supports shutdown and suspend. You can use this 
to test HA
- * as long as the factory always returns the same store instance.
+ * An embedded in-memory checkpoint store, which supports shutdown and suspend.
+ * You can use this to test HA as long as the factory always returns the same 
store instance.
  */
-public class RecoverableCompletedCheckpointStore implements 
CompletedCheckpointStore {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(RecoverableCompletedCheckpointStore.class);
+public class EmbeddedCompletedCheckpointStore implements 
CompletedCheckpointStore {
 
        private final ArrayDeque<CompletedCheckpoint> checkpoints = new 
ArrayDeque<>(2);
 
-       private final ArrayDeque<CompletedCheckpoint> suspended = new 
ArrayDeque<>(2);
+       private final Collection<CompletedCheckpoint> suspended = new 
ArrayDeque<>(2);
 
        private final int maxRetainedCheckpoints;
 
-       public RecoverableCompletedCheckpointStore() {
+       public EmbeddedCompletedCheckpointStore() {
                this(1);
        }
 
-       public RecoverableCompletedCheckpointStore(int maxRetainedCheckpoints) {
+       EmbeddedCompletedCheckpointStore(int maxRetainedCheckpoints) {
                Preconditions.checkArgument(maxRetainedCheckpoints > 0);
                this.maxRetainedCheckpoints = maxRetainedCheckpoints;
        }
 
        @Override
-       public void recover() throws Exception {
+       public void recover() {
                checkpoints.addAll(suspended);
                suspended.clear();
        }
 
        @Override
-       public void addCheckpoint(CompletedCheckpoint checkpoint, 
CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) throws Exception {
-
+       public void addCheckpoint(
+                       CompletedCheckpoint checkpoint,
+                       CheckpointsCleaner checkpointsCleaner,
+                       Runnable postCleanup) throws Exception {
                checkpoints.addLast(checkpoint);
 
                if (checkpoints.size() > maxRetainedCheckpoints) {
@@ -70,7 +66,8 @@ public class RecoverableCompletedCheckpointStore implements 
CompletedCheckpointS
                }
        }
 
-       public void removeOldestCheckpoint() throws Exception {
+       @VisibleForTesting
+       void removeOldestCheckpoint() throws Exception {
                CompletedCheckpoint checkpointToSubsume = 
checkpoints.removeFirst();
                checkpointToSubsume.discardOnSubsume();
        }
@@ -82,17 +79,13 @@ public class RecoverableCompletedCheckpointStore implements 
CompletedCheckpointS
                        suspended.clear();
                } else {
                        suspended.clear();
-
-                       for (CompletedCheckpoint checkpoint : checkpoints) {
-                               suspended.add(checkpoint);
-                       }
-
+                       suspended.addAll(checkpoints);
                        checkpoints.clear();
                }
        }
 
        @Override
-       public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
+       public List<CompletedCheckpoint> getAllCheckpoints() {
                return new ArrayList<>(checkpoints);
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
new file mode 100644
index 0000000..aef4a2c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Simple {@link CheckpointRecoveryFactory} which creates and keeps separate
+ * {@link CompletedCheckpointStore} and {@link CheckpointIDCounter} for each 
{@link JobID}.
+ */
+public class PerJobCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {
+       private final Function<Integer, CompletedCheckpointStore> 
completedCheckpointStorePerJobFactory;
+       private final Supplier<CheckpointIDCounter> 
checkpointIDCounterPerJobFactory;
+       private final Map<JobID, CompletedCheckpointStore> store;
+       private final Map<JobID, CheckpointIDCounter> counter;
+
+       public PerJobCheckpointRecoveryFactory(
+                       Function<Integer, CompletedCheckpointStore> 
completedCheckpointStorePerJobFactory,
+                       Supplier<CheckpointIDCounter> 
checkpointIDCounterPerJobFactory) {
+               this.completedCheckpointStorePerJobFactory = 
completedCheckpointStorePerJobFactory;
+               this.checkpointIDCounterPerJobFactory = 
checkpointIDCounterPerJobFactory;
+               this.store = new HashMap<>();
+               this.counter = new HashMap<>();
+       }
+
+       @Override
+       public CompletedCheckpointStore createCheckpointStore(
+                       JobID jobId,
+                       int maxNumberOfCheckpointsToRetain,
+                       ClassLoader userClassLoader) {
+               return store.computeIfAbsent(jobId, jId ->
+                       
completedCheckpointStorePerJobFactory.apply(maxNumberOfCheckpointsToRetain));
+       }
+
+       @Override
+       public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) {
+               return counter.computeIfAbsent(jobId, jId -> 
checkpointIDCounterPerJobFactory.get());
+       }
+
+       @VisibleForTesting
+       public static CheckpointRecoveryFactory useSameServicesForAllJobs(
+                       CompletedCheckpointStore store,
+                       CheckpointIDCounter counter) {
+               return new PerJobCheckpointRecoveryFactory(n -> store, () -> 
counter);
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/TestingEmbeddedHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
similarity index 68%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/TestingEmbeddedHaServices.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
index 364f9f3..b50f870 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/TestingEmbeddedHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
@@ -19,46 +19,68 @@
 package org.apache.flink.runtime.highavailability.nonha.embedded;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /**
- * {@link EmbeddedHaServices} extension for testing purposes.
+ * {@link EmbeddedHaServices} extension to expose leadership granting and 
revoking.
  */
-public class TestingEmbeddedHaServices extends EmbeddedHaServices {
+public class EmbeddedHaServicesWithLeadershipControl extends 
EmbeddedHaServices implements HaLeadershipControl {
+       private final CheckpointRecoveryFactory 
testingCheckpointRecoveryFactory;
 
-       public TestingEmbeddedHaServices(Executor executor) {
+       public EmbeddedHaServicesWithLeadershipControl(Executor executor) {
                super(executor);
+               this.testingCheckpointRecoveryFactory = new 
PerJobCheckpointRecoveryFactory(
+                       n -> new EmbeddedCompletedCheckpointStore(),
+                       StandaloneCheckpointIDCounter::new);
        }
 
+       @Override
        public CompletableFuture<Void> revokeDispatcherLeadership() {
                final EmbeddedLeaderService dispatcherLeaderService = 
getDispatcherLeaderService();
                return dispatcherLeaderService.revokeLeadership();
        }
 
+       @Override
        public CompletableFuture<Void> grantDispatcherLeadership() {
                final EmbeddedLeaderService dispatcherLeaderService = 
getDispatcherLeaderService();
                return dispatcherLeaderService.grantLeadership();
        }
 
+       @Override
        public CompletableFuture<Void> revokeJobMasterLeadership(JobID jobId) {
                final EmbeddedLeaderService jobMasterLeaderService = 
getJobManagerLeaderService(jobId);
                return jobMasterLeaderService.revokeLeadership();
        }
 
+       @Override
        public CompletableFuture<Void> grantJobMasterLeadership(JobID jobId) {
                final EmbeddedLeaderService jobMasterLeaderService = 
getJobManagerLeaderService(jobId);
                return jobMasterLeaderService.grantLeadership();
        }
 
+       @Override
        public CompletableFuture<Void> revokeResourceManagerLeadership() {
                final EmbeddedLeaderService resourceManagerLeaderService = 
getResourceManagerLeaderService();
                return resourceManagerLeaderService.revokeLeadership();
        }
 
+       @Override
        public CompletableFuture<Void> grantResourceManagerLeadership() {
                final EmbeddedLeaderService resourceManagerLeaderService = 
getResourceManagerLeaderService();
                return resourceManagerLeaderService.grantLeadership();
        }
+
+       @Override
+       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+               synchronized (lock) {
+                       checkNotShutdown();
+                       return testingCheckpointRecoveryFactory;
+               }
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/HaLeadershipControl.java
similarity index 51%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/HaLeadershipControl.java
index 7bc0c85..f0754e6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/HaLeadershipControl.java
@@ -16,31 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.checkpoint;
+package org.apache.flink.runtime.highavailability.nonha.embedded;
 
 import org.apache.flink.api.common.JobID;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
- * Simple {@link CheckpointRecoveryFactory} which is initialized with a
- * {@link CompletedCheckpointStore} and a {@link CheckpointIDCounter}.
+ * Interface to grant and revoke leadership of HA components.
  */
-public class TestingCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {
+public interface HaLeadershipControl {
+       CompletableFuture<Void> revokeDispatcherLeadership();
+
+       CompletableFuture<Void> grantDispatcherLeadership();
 
-       private final CompletedCheckpointStore store;
-       private final CheckpointIDCounter counter;
+       CompletableFuture<Void> revokeJobMasterLeadership(JobID jobId);
 
-       public TestingCheckpointRecoveryFactory(CompletedCheckpointStore store, 
CheckpointIDCounter counter) {
-               this.store = store;
-               this.counter = counter;
-       }
+       CompletableFuture<Void> grantJobMasterLeadership(JobID jobId);
 
-       @Override
-       public CompletedCheckpointStore createCheckpointStore(JobID jobId, int 
maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception {
-               return store;
-       }
+       CompletableFuture<Void> revokeResourceManagerLeadership();
 
-       @Override
-       public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) 
throws Exception {
-               return counter;
-       }
+       CompletableFuture<Void> grantResourceManagerLeadership();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index b3900a3..21323bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.BlobClient;
@@ -52,6 +54,8 @@ import 
org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
@@ -102,6 +106,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -428,11 +433,35 @@ public class MiniCluster implements AutoCloseableAsync {
        }
 
        @VisibleForTesting
-       protected HighAvailabilityServices 
createHighAvailabilityServices(Configuration configuration, Executor executor) 
throws Exception {
+       protected HighAvailabilityServices createHighAvailabilityServices(
+                       Configuration configuration,
+                       Executor executor) throws Exception {
                LOG.info("Starting high-availability services");
-               return 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
-                       configuration,
-                       executor);
+               final HaServices haServices = 
miniClusterConfiguration.getHaServices();
+               switch (haServices) {
+                       case WITH_LEADERSHIP_CONTROL:
+                               return new 
EmbeddedHaServicesWithLeadershipControl(executor);
+                       case CONFIGURED:
+                               return 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration, 
executor);
+                       default:
+                               throw new IllegalConfigurationException("Unkown 
HA Services " + haServices);
+               }
+       }
+
+       /**
+        * Returns {@link HaLeadershipControl} if enabled.
+        *
+        * <p>{@link HaLeadershipControl} allows granting and revoking 
leadership of HA components,
+        * e.g. JobManager. The method return {@link Optional#empty()} if the 
control is not enabled in
+        * {@link MiniClusterConfiguration}.
+        *
+        * <p>Enabling this feature disables {@link 
HighAvailabilityOptions#HA_MODE} option.
+        */
+       public Optional<HaLeadershipControl> getHaLeadershipControl() {
+               synchronized (lock) {
+                       return haServices instanceof HaLeadershipControl ?
+                               Optional.of((HaLeadershipControl) haServices) : 
Optional.empty();
+               }
        }
 
        /**
@@ -1049,4 +1078,22 @@ public class MiniCluster implements AutoCloseableAsync {
                        return new TerminatingFatalErrorHandler(index);
                }
        }
+
+       /**
+        * HA Services to use.
+        */
+       public enum HaServices {
+               /**
+                * Uses the configured HA Services in {@link 
HighAvailabilityOptions#HA_MODE} option.
+                */
+               CONFIGURED,
+
+               /**
+                * Enables or disables {@link HaLeadershipControl} in {@link 
MiniCluster#getHaLeadershipControl}.
+                *
+                * <p>{@link HaLeadershipControl} allows granting and revoking 
leadership of HA components.
+                * Enabling this feature disables {@link 
HighAvailabilityOptions#HA_MODE} option.
+                */
+               WITH_LEADERSHIP_CONTROL
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index a31e08f..07bc37f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -46,6 +46,8 @@ public class MiniClusterConfiguration {
        @Nullable
        private final String commonBindAddress;
 
+       private final MiniCluster.HaServices haServices;
+
        // 
------------------------------------------------------------------------
        //  Construction
        // 
------------------------------------------------------------------------
@@ -54,12 +56,13 @@ public class MiniClusterConfiguration {
                        Configuration configuration,
                        int numTaskManagers,
                        RpcServiceSharing rpcServiceSharing,
-                       @Nullable String commonBindAddress) {
-
+                       @Nullable String commonBindAddress,
+                       MiniCluster.HaServices haServices) {
                this.numTaskManagers = numTaskManagers;
                this.configuration = 
generateConfiguration(Preconditions.checkNotNull(configuration));
                this.rpcServiceSharing = 
Preconditions.checkNotNull(rpcServiceSharing);
                this.commonBindAddress = commonBindAddress;
+               this.haServices = haServices;
        }
 
        private UnmodifiableConfiguration generateConfiguration(final 
Configuration configuration) {
@@ -122,6 +125,10 @@ public class MiniClusterConfiguration {
                return configuration;
        }
 
+       public MiniCluster.HaServices getHaServices() {
+               return haServices;
+       }
+
        @Override
        public String toString() {
                return "MiniClusterConfiguration {" +
@@ -150,6 +157,7 @@ public class MiniClusterConfiguration {
                private RpcServiceSharing rpcServiceSharing = SHARED;
                @Nullable
                private String commonBindAddress = null;
+               private MiniCluster.HaServices haServices = 
MiniCluster.HaServices.CONFIGURED;
 
                public Builder setConfiguration(Configuration configuration1) {
                        this.configuration = 
Preconditions.checkNotNull(configuration1);
@@ -176,6 +184,11 @@ public class MiniClusterConfiguration {
                        return this;
                }
 
+               public Builder setHaServices(MiniCluster.HaServices haServices) 
{
+                       this.haServices = haServices;
+                       return this;
+               }
+
                public MiniClusterConfiguration build() {
                        final Configuration modifiedConfiguration = new 
Configuration(configuration);
                        
modifiedConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numSlotsPerTaskManager);
@@ -187,7 +200,8 @@ public class MiniClusterConfiguration {
                                modifiedConfiguration,
                                numTaskManagers,
                                rpcServiceSharing,
-                               commonBindAddress);
+                               commonBindAddress,
+                               haServices);
                }
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index df3c567..f7a6bf1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.util.SerializableObject;
 import org.apache.flink.util.TestLogger;
 
@@ -144,7 +143,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
                ExecutionVertex[] arrayExecutionVertices =
                        allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
 
-               CompletedCheckpointStore store = new 
RecoverableCompletedCheckpointStore();
+               CompletedCheckpointStore store = new 
EmbeddedCompletedCheckpointStore();
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord =
@@ -264,7 +263,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
                        tasks.add(stateful);
                        tasks.add(stateless);
 
-                       CompletedCheckpointStore store = new 
RecoverableCompletedCheckpointStore(2);
+                       CompletedCheckpointStore store = new 
EmbeddedCompletedCheckpointStore(2);
 
                        CheckpointCoordinatorConfiguration chkConfig =
                                new CheckpointCoordinatorConfigurationBuilder()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index fa9984b..128b74c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -58,7 +58,6 @@ import 
org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAcces
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import 
org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
-import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -2034,7 +2033,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                ExecutionVertex[] arrayExecutionVertices =
                        allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
 
-               RecoverableCompletedCheckpointStore store = new 
RecoverableCompletedCheckpointStore(10);
+               EmbeddedCompletedCheckpointStore store = new 
EmbeddedCompletedCheckpointStore(10);
 
                final List<SharedStateRegistry> createdSharedStateRegistries = 
new ArrayList<>(2);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 6b04242..0ec1f4f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
@@ -48,7 +49,6 @@ import 
org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -182,6 +182,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs;
 import static 
org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
 import static 
org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle;
 import static 
org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
@@ -781,7 +782,7 @@ public class JobMasterTest extends TestLogger {
                final JobGraph jobGraph = 
createJobGraphWithCheckpointing(savepointRestoreSettings);
 
                final StandaloneCompletedCheckpointStore 
completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
-               final TestingCheckpointRecoveryFactory 
testingCheckpointRecoveryFactory = new 
TestingCheckpointRecoveryFactory(completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
+               final CheckpointRecoveryFactory 
testingCheckpointRecoveryFactory = 
useSameServicesForAllJobs(completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
                
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
                final JobMaster jobMaster = createJobMaster(
                        configuration,
@@ -824,7 +825,7 @@ public class JobMasterTest extends TestLogger {
                final JobGraph jobGraphWithNewOperator = 
createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings, 
jobVertex);
 
                final StandaloneCompletedCheckpointStore 
completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
-               final TestingCheckpointRecoveryFactory 
testingCheckpointRecoveryFactory = new 
TestingCheckpointRecoveryFactory(completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
+               final CheckpointRecoveryFactory 
testingCheckpointRecoveryFactory = 
useSameServicesForAllJobs(completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
                
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
 
                try {
@@ -894,7 +895,7 @@ public class JobMasterTest extends TestLogger {
                final StandaloneCompletedCheckpointStore 
completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
                completedCheckpointStore.addCheckpoint(completedCheckpoint, new 
CheckpointsCleaner(), () -> {
                });
-               final TestingCheckpointRecoveryFactory 
testingCheckpointRecoveryFactory = new 
TestingCheckpointRecoveryFactory(completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
+               final CheckpointRecoveryFactory 
testingCheckpointRecoveryFactory = 
useSameServicesForAllJobs(completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
                
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
 
                final JobMaster jobMaster = createJobMaster(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
index 2edcc23..bf1bb50 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.runtime.execution.Environment;
-import 
org.apache.flink.runtime.highavailability.nonha.embedded.TestingEmbeddedHaServices;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -64,7 +64,7 @@ public class LeaderChangeClusterComponentsTest extends 
TestLogger {
 
        private static TestingMiniCluster miniCluster;
 
-       private static TestingEmbeddedHaServices highAvailabilityServices;
+       private static EmbeddedHaServicesWithLeadershipControl 
highAvailabilityServices;
 
        private JobGraph jobGraph;
 
@@ -72,7 +72,7 @@ public class LeaderChangeClusterComponentsTest extends 
TestLogger {
 
        @BeforeClass
        public static void setupClass() throws Exception  {
-               highAvailabilityServices = new 
TestingEmbeddedHaServices(TestingUtils.defaultExecutor());
+               highAvailabilityServices = new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
 
                miniCluster = new TestingMiniCluster(
                        new TestingMiniClusterConfiguration.Builder()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
index 98a4e48..7d90b70 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
@@ -43,7 +43,7 @@ public class TestingMiniClusterConfiguration extends 
MiniClusterConfiguration {
                        @Nullable String commonBindAddress,
                        int numberDispatcherResourceManagerComponents,
                        boolean localCommunication) {
-               super(configuration, numTaskManagers, rpcServiceSharing, 
commonBindAddress);
+               super(configuration, numTaskManagers, rpcServiceSharing, 
commonBindAddress, MiniCluster.HaServices.CONFIGURED);
                this.numberDispatcherResourceManagerComponents = 
numberDispatcherResourceManagerComponents;
                this.localCommunication = localCommunication;
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
index aaa7f54..501cd76 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
@@ -173,6 +173,7 @@ public class MiniClusterResource extends ExternalResource {
                        
.setNumTaskManagers(miniClusterResourceConfiguration.getNumberTaskManagers())
                        
.setNumSlotsPerTaskManager(miniClusterResourceConfiguration.getNumberSlotsPerTaskManager())
                        
.setRpcServiceSharing(miniClusterResourceConfiguration.getRpcServiceSharing())
+                       
.setHaServices(miniClusterResourceConfiguration.getHaServices())
                        .build();
 
                miniCluster = new MiniCluster(miniClusterConfiguration);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
index d54996d..5ec20a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.testutils;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.util.Preconditions;
 
@@ -40,17 +43,22 @@ public class MiniClusterResourceConfiguration {
 
        private final RpcServiceSharing rpcServiceSharing;
 
+       private final MiniCluster.HaServices haServices;
+
        protected MiniClusterResourceConfiguration(
                Configuration configuration,
                int numberTaskManagers,
                int numberSlotsPerTaskManager,
                Time shutdownTimeout,
-               RpcServiceSharing rpcServiceSharing) {
+               RpcServiceSharing rpcServiceSharing,
+               MiniCluster.HaServices haServices) {
+
                this.configuration = new 
UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
                this.numberTaskManagers = numberTaskManagers;
                this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
                this.shutdownTimeout = 
Preconditions.checkNotNull(shutdownTimeout);
                this.rpcServiceSharing = 
Preconditions.checkNotNull(rpcServiceSharing);
+               this.haServices = haServices;
        }
 
        public Configuration getConfiguration() {
@@ -73,6 +81,10 @@ public class MiniClusterResourceConfiguration {
                return rpcServiceSharing;
        }
 
+       public MiniCluster.HaServices getHaServices() {
+               return haServices;
+       }
+
        /**
         * Builder for {@link MiniClusterResourceConfiguration}.
         */
@@ -84,6 +96,7 @@ public class MiniClusterResourceConfiguration {
                private Time shutdownTimeout = 
AkkaUtils.getTimeoutAsTime(configuration);
 
                private RpcServiceSharing rpcServiceSharing = 
RpcServiceSharing.SHARED;
+               private MiniCluster.HaServices haServices = 
MiniCluster.HaServices.CONFIGURED;
 
                public Builder setConfiguration(Configuration configuration) {
                        this.configuration = configuration;
@@ -110,8 +123,25 @@ public class MiniClusterResourceConfiguration {
                        return this;
                }
 
+               /**
+                * Enables or disables {@link HaLeadershipControl} in {@link 
MiniCluster#getHaLeadershipControl}.
+                *
+                * <p>{@link HaLeadershipControl} allows granting and revoking 
leadership of HA components.
+                * Enabling this feature disables {@link 
HighAvailabilityOptions#HA_MODE} option.
+                */
+               public Builder withHaLeadershipControl() {
+                       this.haServices = 
MiniCluster.HaServices.WITH_LEADERSHIP_CONTROL;
+                       return this;
+               }
+
                public MiniClusterResourceConfiguration build() {
-                       return new 
MiniClusterResourceConfiguration(configuration, numberTaskManagers, 
numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing);
+                       return new MiniClusterResourceConfiguration(
+                               configuration,
+                               numberTaskManagers,
+                               numberSlotsPerTaskManager,
+                               shutdownTimeout,
+                               rpcServiceSharing,
+                               haServices);
                }
        }
 }
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
index 8b5167a..07b45a8 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 
 /**
@@ -38,7 +39,7 @@ public class MiniClusterResourceConfiguration extends 
org.apache.flink.runtime.t
                        int numberSlotsPerTaskManager,
                        Time shutdownTimeout,
                        RpcServiceSharing rpcServiceSharing) {
-               super(configuration, numberTaskManagers, 
numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing);
+               super(configuration, numberTaskManagers, 
numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing, 
MiniCluster.HaServices.CONFIGURED);
        }
 
        /**
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
index a850dc3..10eddb3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
@@ -38,9 +38,9 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
@@ -448,7 +448,7 @@ public class NotifyCheckpointAbortedITCase extends 
TestLogger {
                @Override
                public HighAvailabilityServices createHAServices(Configuration 
configuration, Executor executor) {
                        return new TestingHaServices(
-                               new TestingCheckpointRecoveryFactory(new 
TestingCompletedCheckpointStore(), new StandaloneCheckpointIDCounter()),
+                               
PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(new 
TestingCompletedCheckpointStore(), new StandaloneCheckpointIDCounter()),
                                executor);
                }
        }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index 976b895..918105b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -34,9 +34,9 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
 import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
@@ -445,7 +445,7 @@ public class RegionFailoverITCase extends TestLogger {
                @Override
                public HighAvailabilityServices createHAServices(Configuration 
configuration, Executor executor) {
                        return new TestingHaServices(
-                               new TestingCheckpointRecoveryFactory(new 
TestingCompletedCheckpointStore(), new StandaloneCheckpointIDCounter()),
+                               
PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(new 
TestingCompletedCheckpointStore(), new StandaloneCheckpointIDCounter()),
                                executor);
                }
        }

Reply via email to