This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 1afe691 [FLINK-20468][minicluster] Enable leadership control in
MiniCluster to test JM failover
1afe691 is described below
commit 1afe691ed51d32cd6dc04fafc322e2f540b186d8
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Thu Dec 3 11:37:14 2020 +0300
[FLINK-20468][minicluster] Enable leadership control in MiniCluster to test
JM failover
Currently, there is no easy way to test how JM failover (revoke and grant
leadership)
affects other features with the MiniCluster and its testing resource rule.
The custom HA services can be provided to the TestingMiniCluster but there
is no simple HA services
to support revoking and granting leadership with a valid in-memory
checkpoint store.
Providing a way to enable such embedded HA services for the MiniCluster out
of the box allows to implement IT cases similar to E2E tests.
This closes #14301.
---
.../file/src/FileSourceTextLinesITCase.java | 137 +++------------------
.../EmbeddedCompletedCheckpointStore.java} | 43 +++----
.../PerJobCheckpointRecoveryFactory.java | 68 ++++++++++
.../EmbeddedHaServicesWithLeadershipControl.java} | 28 ++++-
.../nonha/embedded/HaLeadershipControl.java} | 30 ++---
.../flink/runtime/minicluster/MiniCluster.java | 55 ++++++++-
.../minicluster/MiniClusterConfiguration.java | 20 ++-
.../CheckpointCoordinatorRestoringTest.java | 5 +-
.../checkpoint/CheckpointCoordinatorTest.java | 3 +-
.../flink/runtime/jobmaster/JobMasterTest.java | 9 +-
.../LeaderChangeClusterComponentsTest.java | 6 +-
.../TestingMiniClusterConfiguration.java | 2 +-
.../runtime/testutils/MiniClusterResource.java | 1 +
.../MiniClusterResourceConfiguration.java | 34 ++++-
.../util/MiniClusterResourceConfiguration.java | 3 +-
.../NotifyCheckpointAbortedITCase.java | 4 +-
.../test/checkpointing/RegionFailoverITCase.java | 4 +-
17 files changed, 262 insertions(+), 190 deletions(-)
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
index 9075258..03dadbb 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
@@ -20,31 +20,19 @@ package org.apache.flink.connector.file.src;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
-import
org.apache.flink.runtime.highavailability.nonha.embedded.TestingEmbeddedHaServices;
+import
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
-import org.apache.flink.runtime.minicluster.TestingMiniCluster;
-import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionWithException;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -63,9 +51,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream;
import static org.hamcrest.Matchers.equalTo;
@@ -82,54 +68,15 @@ public class FileSourceTextLinesITCase extends TestLogger {
@ClassRule
public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
- private static TestingMiniCluster miniCluster;
-
- private static TestingEmbeddedHaServices highAvailabilityServices;
-
- private static CompletedCheckpointStore checkpointStore;
-
- @BeforeClass
- public static void setupMiniCluster() throws Exception {
- highAvailabilityServices = new
HaServices(TestingUtils.defaultExecutor(),
- () -> checkpointStore,
- new StandaloneCheckpointIDCounter());
-
- final Configuration configuration = createConfiguration();
-
- miniCluster = new TestingMiniCluster(
- new TestingMiniClusterConfiguration.Builder()
- .setConfiguration(configuration)
- .setNumTaskManagers(1)
- .setNumSlotsPerTaskManager(PARALLELISM)
-
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
- .build(),
- () -> highAvailabilityServices);
-
- miniCluster.start();
- }
-
- private static Configuration createConfiguration() throws IOException {
- final Configuration configuration = new Configuration();
- final String checkPointDir =
Path.fromLocalFile(TMP_FOLDER.newFolder()).toUri().toString();
- configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkPointDir);
- return configuration;
- }
-
- @Before
- public void setup() {
- checkpointStore = new RecoverableCompletedCheckpointStore();
- }
-
- @AfterClass
- public static void shutdownMiniCluster() throws Exception {
- if (miniCluster != null) {
- miniCluster.close();
- }
- if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
- highAvailabilityServices = null;
- }
- }
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE
= new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration
+ .Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .withHaLeadershipControl()
+ .build());
//
------------------------------------------------------------------------
// test cases
@@ -174,7 +121,7 @@ public class FileSourceTextLinesITCase extends TestLogger {
.forRecordStreamFormat(new TextLineFormat(),
Path.fromLocalFile(testDir))
.build();
- final StreamExecutionEnvironment env = new
TestStreamEnvironment(miniCluster, PARALLELISM);
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
final DataStream<String> stream = env.fromSource(
@@ -236,7 +183,7 @@ public class FileSourceTextLinesITCase extends TestLogger {
.monitorContinuously(Duration.ofMillis(5))
.build();
- final StreamExecutionEnvironment env = new
TestStreamEnvironment(miniCluster, PARALLELISM);
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.enableCheckpointing(10L);
@@ -308,15 +255,17 @@ public class FileSourceTextLinesITCase extends TestLogger
{
}
private static void triggerJobManagerFailover(JobID jobId, Runnable
afterFailAction) throws Exception {
- highAvailabilityServices.revokeJobMasterLeadership(jobId).get();
+ final HaLeadershipControl haLeadershipControl =
+
MINI_CLUSTER_RESOURCE.getMiniCluster().getHaLeadershipControl().get();
+ haLeadershipControl.revokeJobMasterLeadership(jobId).get();
afterFailAction.run();
- highAvailabilityServices.grantJobMasterLeadership(jobId).get();
+ haLeadershipControl.grantJobMasterLeadership(jobId).get();
}
private static void restartTaskManager(Runnable afterFailAction) throws
Exception {
- miniCluster.terminateTaskManager(0).get();
+
MINI_CLUSTER_RESOURCE.getMiniCluster().terminateTaskManager(0).get();
afterFailAction.run();
- miniCluster.startTaskManager();
+ MINI_CLUSTER_RESOURCE.getMiniCluster().startTaskManager();
}
//
------------------------------------------------------------------------
@@ -483,52 +432,6 @@ public class FileSourceTextLinesITCase extends TestLogger {
// mini cluster failover utilities
//
------------------------------------------------------------------------
- private static class HaServices extends TestingEmbeddedHaServices {
- private final Supplier<CompletedCheckpointStore>
completedCheckpointStoreFactory;
- private final CheckpointIDCounter checkpointIDCounter;
-
- private HaServices(
- Executor executor,
- Supplier<CompletedCheckpointStore>
completedCheckpointStoreFactory,
- CheckpointIDCounter checkpointIDCounter) {
- super(executor);
- this.completedCheckpointStoreFactory =
completedCheckpointStoreFactory;
- this.checkpointIDCounter = checkpointIDCounter;
- }
-
- @Override
- public CheckpointRecoveryFactory getCheckpointRecoveryFactory()
{
- return new CheckpointRecoveryFactoryWithSettableStore(
- completedCheckpointStoreFactory,
- checkpointIDCounter);
- }
- }
-
- private static class CheckpointRecoveryFactoryWithSettableStore
implements CheckpointRecoveryFactory {
- private final Supplier<CompletedCheckpointStore>
completedCheckpointStoreFactory;
- private final CheckpointIDCounter checkpointIDCounter;
-
- private CheckpointRecoveryFactoryWithSettableStore(
- Supplier<CompletedCheckpointStore>
completedCheckpointStoreFactory,
- CheckpointIDCounter checkpointIDCounter) {
- this.completedCheckpointStoreFactory =
completedCheckpointStoreFactory;
- this.checkpointIDCounter = checkpointIDCounter;
- }
-
- @Override
- public CompletedCheckpointStore createCheckpointStore(
- JobID jobId,
- int maxNumberOfCheckpointsToRetain,
- ClassLoader userClassLoader) {
- return completedCheckpointStoreFactory.get();
- }
-
- @Override
- public CheckpointIDCounter createCheckpointIDCounter(JobID
jobId) {
- return checkpointIDCounter;
- }
- }
-
private static class RecordCounterToFail {
private static AtomicInteger records;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
similarity index 64%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index ba2dec0..643761f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -16,53 +16,49 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.testutils;
+package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
/**
- * A checkpoint store, which supports shutdown and suspend. You can use this
to test HA
- * as long as the factory always returns the same store instance.
+ * An embedded in-memory checkpoint store, which supports shutdown and suspend.
+ * You can use this to test HA as long as the factory always returns the same
store instance.
*/
-public class RecoverableCompletedCheckpointStore implements
CompletedCheckpointStore {
-
- private static final Logger LOG =
LoggerFactory.getLogger(RecoverableCompletedCheckpointStore.class);
+public class EmbeddedCompletedCheckpointStore implements
CompletedCheckpointStore {
private final ArrayDeque<CompletedCheckpoint> checkpoints = new
ArrayDeque<>(2);
- private final ArrayDeque<CompletedCheckpoint> suspended = new
ArrayDeque<>(2);
+ private final Collection<CompletedCheckpoint> suspended = new
ArrayDeque<>(2);
private final int maxRetainedCheckpoints;
- public RecoverableCompletedCheckpointStore() {
+ public EmbeddedCompletedCheckpointStore() {
this(1);
}
- public RecoverableCompletedCheckpointStore(int maxRetainedCheckpoints) {
+ EmbeddedCompletedCheckpointStore(int maxRetainedCheckpoints) {
Preconditions.checkArgument(maxRetainedCheckpoints > 0);
this.maxRetainedCheckpoints = maxRetainedCheckpoints;
}
@Override
- public void recover() throws Exception {
+ public void recover() {
checkpoints.addAll(suspended);
suspended.clear();
}
@Override
- public void addCheckpoint(CompletedCheckpoint checkpoint,
CheckpointsCleaner checkpointsCleaner, Runnable postCleanup) throws Exception {
-
+ public void addCheckpoint(
+ CompletedCheckpoint checkpoint,
+ CheckpointsCleaner checkpointsCleaner,
+ Runnable postCleanup) throws Exception {
checkpoints.addLast(checkpoint);
if (checkpoints.size() > maxRetainedCheckpoints) {
@@ -70,7 +66,8 @@ public class RecoverableCompletedCheckpointStore implements
CompletedCheckpointS
}
}
- public void removeOldestCheckpoint() throws Exception {
+ @VisibleForTesting
+ void removeOldestCheckpoint() throws Exception {
CompletedCheckpoint checkpointToSubsume =
checkpoints.removeFirst();
checkpointToSubsume.discardOnSubsume();
}
@@ -82,17 +79,13 @@ public class RecoverableCompletedCheckpointStore implements
CompletedCheckpointS
suspended.clear();
} else {
suspended.clear();
-
- for (CompletedCheckpoint checkpoint : checkpoints) {
- suspended.add(checkpoint);
- }
-
+ suspended.addAll(checkpoints);
checkpoints.clear();
}
}
@Override
- public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
+ public List<CompletedCheckpoint> getAllCheckpoints() {
return new ArrayList<>(checkpoints);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
new file mode 100644
index 0000000..aef4a2c
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Simple {@link CheckpointRecoveryFactory} which creates and keeps separate
+ * {@link CompletedCheckpointStore} and {@link CheckpointIDCounter} for each
{@link JobID}.
+ */
+public class PerJobCheckpointRecoveryFactory implements
CheckpointRecoveryFactory {
+ private final Function<Integer, CompletedCheckpointStore>
completedCheckpointStorePerJobFactory;
+ private final Supplier<CheckpointIDCounter>
checkpointIDCounterPerJobFactory;
+ private final Map<JobID, CompletedCheckpointStore> store;
+ private final Map<JobID, CheckpointIDCounter> counter;
+
+ public PerJobCheckpointRecoveryFactory(
+ Function<Integer, CompletedCheckpointStore>
completedCheckpointStorePerJobFactory,
+ Supplier<CheckpointIDCounter>
checkpointIDCounterPerJobFactory) {
+ this.completedCheckpointStorePerJobFactory =
completedCheckpointStorePerJobFactory;
+ this.checkpointIDCounterPerJobFactory =
checkpointIDCounterPerJobFactory;
+ this.store = new HashMap<>();
+ this.counter = new HashMap<>();
+ }
+
+ @Override
+ public CompletedCheckpointStore createCheckpointStore(
+ JobID jobId,
+ int maxNumberOfCheckpointsToRetain,
+ ClassLoader userClassLoader) {
+ return store.computeIfAbsent(jobId, jId ->
+
completedCheckpointStorePerJobFactory.apply(maxNumberOfCheckpointsToRetain));
+ }
+
+ @Override
+ public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) {
+ return counter.computeIfAbsent(jobId, jId ->
checkpointIDCounterPerJobFactory.get());
+ }
+
+ @VisibleForTesting
+ public static CheckpointRecoveryFactory useSameServicesForAllJobs(
+ CompletedCheckpointStore store,
+ CheckpointIDCounter counter) {
+ return new PerJobCheckpointRecoveryFactory(n -> store, () ->
counter);
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/TestingEmbeddedHaServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
similarity index 68%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/TestingEmbeddedHaServices.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
index 364f9f3..b50f870 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/TestingEmbeddedHaServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
@@ -19,46 +19,68 @@
package org.apache.flink.runtime.highavailability.nonha.embedded;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
- * {@link EmbeddedHaServices} extension for testing purposes.
+ * {@link EmbeddedHaServices} extension to expose leadership granting and
revoking.
*/
-public class TestingEmbeddedHaServices extends EmbeddedHaServices {
+public class EmbeddedHaServicesWithLeadershipControl extends
EmbeddedHaServices implements HaLeadershipControl {
+ private final CheckpointRecoveryFactory
testingCheckpointRecoveryFactory;
- public TestingEmbeddedHaServices(Executor executor) {
+ public EmbeddedHaServicesWithLeadershipControl(Executor executor) {
super(executor);
+ this.testingCheckpointRecoveryFactory = new
PerJobCheckpointRecoveryFactory(
+ n -> new EmbeddedCompletedCheckpointStore(),
+ StandaloneCheckpointIDCounter::new);
}
+ @Override
public CompletableFuture<Void> revokeDispatcherLeadership() {
final EmbeddedLeaderService dispatcherLeaderService =
getDispatcherLeaderService();
return dispatcherLeaderService.revokeLeadership();
}
+ @Override
public CompletableFuture<Void> grantDispatcherLeadership() {
final EmbeddedLeaderService dispatcherLeaderService =
getDispatcherLeaderService();
return dispatcherLeaderService.grantLeadership();
}
+ @Override
public CompletableFuture<Void> revokeJobMasterLeadership(JobID jobId) {
final EmbeddedLeaderService jobMasterLeaderService =
getJobManagerLeaderService(jobId);
return jobMasterLeaderService.revokeLeadership();
}
+ @Override
public CompletableFuture<Void> grantJobMasterLeadership(JobID jobId) {
final EmbeddedLeaderService jobMasterLeaderService =
getJobManagerLeaderService(jobId);
return jobMasterLeaderService.grantLeadership();
}
+ @Override
public CompletableFuture<Void> revokeResourceManagerLeadership() {
final EmbeddedLeaderService resourceManagerLeaderService =
getResourceManagerLeaderService();
return resourceManagerLeaderService.revokeLeadership();
}
+ @Override
public CompletableFuture<Void> grantResourceManagerLeadership() {
final EmbeddedLeaderService resourceManagerLeaderService =
getResourceManagerLeaderService();
return resourceManagerLeaderService.grantLeadership();
}
+
+ @Override
+ public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+ synchronized (lock) {
+ checkNotShutdown();
+ return testingCheckpointRecoveryFactory;
+ }
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/HaLeadershipControl.java
similarity index 51%
rename from
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/HaLeadershipControl.java
index 7bc0c85..f0754e6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/HaLeadershipControl.java
@@ -16,31 +16,25 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.checkpoint;
+package org.apache.flink.runtime.highavailability.nonha.embedded;
import org.apache.flink.api.common.JobID;
+import java.util.concurrent.CompletableFuture;
+
/**
- * Simple {@link CheckpointRecoveryFactory} which is initialized with a
- * {@link CompletedCheckpointStore} and a {@link CheckpointIDCounter}.
+ * Interface to grant and revoke leadership of HA components.
*/
-public class TestingCheckpointRecoveryFactory implements
CheckpointRecoveryFactory {
+public interface HaLeadershipControl {
+ CompletableFuture<Void> revokeDispatcherLeadership();
+
+ CompletableFuture<Void> grantDispatcherLeadership();
- private final CompletedCheckpointStore store;
- private final CheckpointIDCounter counter;
+ CompletableFuture<Void> revokeJobMasterLeadership(JobID jobId);
- public TestingCheckpointRecoveryFactory(CompletedCheckpointStore store,
CheckpointIDCounter counter) {
- this.store = store;
- this.counter = counter;
- }
+ CompletableFuture<Void> grantJobMasterLeadership(JobID jobId);
- @Override
- public CompletedCheckpointStore createCheckpointStore(JobID jobId, int
maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception {
- return store;
- }
+ CompletableFuture<Void> revokeResourceManagerLeadership();
- @Override
- public CheckpointIDCounter createCheckpointIDCounter(JobID jobId)
throws Exception {
- return counter;
- }
+ CompletableFuture<Void> grantResourceManagerLeadership();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index b3900a3..21323bb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobClient;
@@ -52,6 +54,8 @@ import
org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
@@ -102,6 +106,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -428,11 +433,35 @@ public class MiniCluster implements AutoCloseableAsync {
}
@VisibleForTesting
- protected HighAvailabilityServices
createHighAvailabilityServices(Configuration configuration, Executor executor)
throws Exception {
+ protected HighAvailabilityServices createHighAvailabilityServices(
+ Configuration configuration,
+ Executor executor) throws Exception {
LOG.info("Starting high-availability services");
- return
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
- configuration,
- executor);
+ final HaServices haServices =
miniClusterConfiguration.getHaServices();
+ switch (haServices) {
+ case WITH_LEADERSHIP_CONTROL:
+ return new
EmbeddedHaServicesWithLeadershipControl(executor);
+ case CONFIGURED:
+ return
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration,
executor);
+ default:
+ throw new IllegalConfigurationException("Unkown
HA Services " + haServices);
+ }
+ }
+
+ /**
+ * Returns {@link HaLeadershipControl} if enabled.
+ *
+ * <p>{@link HaLeadershipControl} allows granting and revoking
leadership of HA components,
+ * e.g. JobManager. The method return {@link Optional#empty()} if the
control is not enabled in
+ * {@link MiniClusterConfiguration}.
+ *
+ * <p>Enabling this feature disables {@link
HighAvailabilityOptions#HA_MODE} option.
+ */
+ public Optional<HaLeadershipControl> getHaLeadershipControl() {
+ synchronized (lock) {
+ return haServices instanceof HaLeadershipControl ?
+ Optional.of((HaLeadershipControl) haServices) :
Optional.empty();
+ }
}
/**
@@ -1049,4 +1078,22 @@ public class MiniCluster implements AutoCloseableAsync {
return new TerminatingFatalErrorHandler(index);
}
}
+
+ /**
+ * HA Services to use.
+ */
+ public enum HaServices {
+ /**
+ * Uses the configured HA Services in {@link
HighAvailabilityOptions#HA_MODE} option.
+ */
+ CONFIGURED,
+
+ /**
+ * Enables or disables {@link HaLeadershipControl} in {@link
MiniCluster#getHaLeadershipControl}.
+ *
+ * <p>{@link HaLeadershipControl} allows granting and revoking
leadership of HA components.
+ * Enabling this feature disables {@link
HighAvailabilityOptions#HA_MODE} option.
+ */
+ WITH_LEADERSHIP_CONTROL
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index a31e08f..07bc37f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -46,6 +46,8 @@ public class MiniClusterConfiguration {
@Nullable
private final String commonBindAddress;
+ private final MiniCluster.HaServices haServices;
+
//
------------------------------------------------------------------------
// Construction
//
------------------------------------------------------------------------
@@ -54,12 +56,13 @@ public class MiniClusterConfiguration {
Configuration configuration,
int numTaskManagers,
RpcServiceSharing rpcServiceSharing,
- @Nullable String commonBindAddress) {
-
+ @Nullable String commonBindAddress,
+ MiniCluster.HaServices haServices) {
this.numTaskManagers = numTaskManagers;
this.configuration =
generateConfiguration(Preconditions.checkNotNull(configuration));
this.rpcServiceSharing =
Preconditions.checkNotNull(rpcServiceSharing);
this.commonBindAddress = commonBindAddress;
+ this.haServices = haServices;
}
private UnmodifiableConfiguration generateConfiguration(final
Configuration configuration) {
@@ -122,6 +125,10 @@ public class MiniClusterConfiguration {
return configuration;
}
+ public MiniCluster.HaServices getHaServices() {
+ return haServices;
+ }
+
@Override
public String toString() {
return "MiniClusterConfiguration {" +
@@ -150,6 +157,7 @@ public class MiniClusterConfiguration {
private RpcServiceSharing rpcServiceSharing = SHARED;
@Nullable
private String commonBindAddress = null;
+ private MiniCluster.HaServices haServices =
MiniCluster.HaServices.CONFIGURED;
public Builder setConfiguration(Configuration configuration1) {
this.configuration =
Preconditions.checkNotNull(configuration1);
@@ -176,6 +184,11 @@ public class MiniClusterConfiguration {
return this;
}
+ public Builder setHaServices(MiniCluster.HaServices haServices)
{
+ this.haServices = haServices;
+ return this;
+ }
+
public MiniClusterConfiguration build() {
final Configuration modifiedConfiguration = new
Configuration(configuration);
modifiedConfiguration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,
numSlotsPerTaskManager);
@@ -187,7 +200,8 @@ public class MiniClusterConfiguration {
modifiedConfiguration,
numTaskManagers,
rpcServiceSharing,
- commonBindAddress);
+ commonBindAddress,
+ haServices);
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index df3c567..f7a6bf1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.util.SerializableObject;
import org.apache.flink.util.TestLogger;
@@ -144,7 +143,7 @@ public class CheckpointCoordinatorRestoringTest extends
TestLogger {
ExecutionVertex[] arrayExecutionVertices =
allExecutionVertices.toArray(new
ExecutionVertex[allExecutionVertices.size()]);
- CompletedCheckpointStore store = new
RecoverableCompletedCheckpointStore();
+ CompletedCheckpointStore store = new
EmbeddedCompletedCheckpointStore();
// set up the coordinator and validate the initial state
CheckpointCoordinator coord =
@@ -264,7 +263,7 @@ public class CheckpointCoordinatorRestoringTest extends
TestLogger {
tasks.add(stateful);
tasks.add(stateless);
- CompletedCheckpointStore store = new
RecoverableCompletedCheckpointStore(2);
+ CompletedCheckpointStore store = new
EmbeddedCompletedCheckpointStore(2);
CheckpointCoordinatorConfiguration chkConfig =
new CheckpointCoordinatorConfigurationBuilder()
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index fa9984b..128b74c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -58,7 +58,6 @@ import
org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAcces
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import
org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
import
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
-import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -2034,7 +2033,7 @@ public class CheckpointCoordinatorTest extends TestLogger
{
ExecutionVertex[] arrayExecutionVertices =
allExecutionVertices.toArray(new
ExecutionVertex[allExecutionVertices.size()]);
- RecoverableCompletedCheckpointStore store = new
RecoverableCompletedCheckpointStore(10);
+ EmbeddedCompletedCheckpointStore store = new
EmbeddedCompletedCheckpointStore(10);
final List<SharedStateRegistry> createdSharedStateRegistries =
new ArrayList<>(2);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 6b04242..0ec1f4f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
@@ -48,7 +49,6 @@ import
org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -182,6 +182,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static
org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs;
import static
org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle;
import static
org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle;
import static
org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
@@ -781,7 +782,7 @@ public class JobMasterTest extends TestLogger {
final JobGraph jobGraph =
createJobGraphWithCheckpointing(savepointRestoreSettings);
final StandaloneCompletedCheckpointStore
completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
- final TestingCheckpointRecoveryFactory
testingCheckpointRecoveryFactory = new
TestingCheckpointRecoveryFactory(completedCheckpointStore, new
StandaloneCheckpointIDCounter());
+ final CheckpointRecoveryFactory
testingCheckpointRecoveryFactory =
useSameServicesForAllJobs(completedCheckpointStore, new
StandaloneCheckpointIDCounter());
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
final JobMaster jobMaster = createJobMaster(
configuration,
@@ -824,7 +825,7 @@ public class JobMasterTest extends TestLogger {
final JobGraph jobGraphWithNewOperator =
createJobGraphFromJobVerticesWithCheckpointing(savepointRestoreSettings,
jobVertex);
final StandaloneCompletedCheckpointStore
completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
- final TestingCheckpointRecoveryFactory
testingCheckpointRecoveryFactory = new
TestingCheckpointRecoveryFactory(completedCheckpointStore, new
StandaloneCheckpointIDCounter());
+ final CheckpointRecoveryFactory
testingCheckpointRecoveryFactory =
useSameServicesForAllJobs(completedCheckpointStore, new
StandaloneCheckpointIDCounter());
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
try {
@@ -894,7 +895,7 @@ public class JobMasterTest extends TestLogger {
final StandaloneCompletedCheckpointStore
completedCheckpointStore = new StandaloneCompletedCheckpointStore(1);
completedCheckpointStore.addCheckpoint(completedCheckpoint, new
CheckpointsCleaner(), () -> {
});
- final TestingCheckpointRecoveryFactory
testingCheckpointRecoveryFactory = new
TestingCheckpointRecoveryFactory(completedCheckpointStore, new
StandaloneCheckpointIDCounter());
+ final CheckpointRecoveryFactory
testingCheckpointRecoveryFactory =
useSameServicesForAllJobs(completedCheckpointStore, new
StandaloneCheckpointIDCounter());
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
final JobMaster jobMaster = createJobMaster(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
index 2edcc23..bf1bb50 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.runtime.execution.Environment;
-import
org.apache.flink.runtime.highavailability.nonha.embedded.TestingEmbeddedHaServices;
+import
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -64,7 +64,7 @@ public class LeaderChangeClusterComponentsTest extends
TestLogger {
private static TestingMiniCluster miniCluster;
- private static TestingEmbeddedHaServices highAvailabilityServices;
+ private static EmbeddedHaServicesWithLeadershipControl
highAvailabilityServices;
private JobGraph jobGraph;
@@ -72,7 +72,7 @@ public class LeaderChangeClusterComponentsTest extends
TestLogger {
@BeforeClass
public static void setupClass() throws Exception {
- highAvailabilityServices = new
TestingEmbeddedHaServices(TestingUtils.defaultExecutor());
+ highAvailabilityServices = new
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
miniCluster = new TestingMiniCluster(
new TestingMiniClusterConfiguration.Builder()
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
index 98a4e48..7d90b70 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
@@ -43,7 +43,7 @@ public class TestingMiniClusterConfiguration extends
MiniClusterConfiguration {
@Nullable String commonBindAddress,
int numberDispatcherResourceManagerComponents,
boolean localCommunication) {
- super(configuration, numTaskManagers, rpcServiceSharing,
commonBindAddress);
+ super(configuration, numTaskManagers, rpcServiceSharing,
commonBindAddress, MiniCluster.HaServices.CONFIGURED);
this.numberDispatcherResourceManagerComponents =
numberDispatcherResourceManagerComponents;
this.localCommunication = localCommunication;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
index aaa7f54..501cd76 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java
@@ -173,6 +173,7 @@ public class MiniClusterResource extends ExternalResource {
.setNumTaskManagers(miniClusterResourceConfiguration.getNumberTaskManagers())
.setNumSlotsPerTaskManager(miniClusterResourceConfiguration.getNumberSlotsPerTaskManager())
.setRpcServiceSharing(miniClusterResourceConfiguration.getRpcServiceSharing())
+
.setHaServices(miniClusterResourceConfiguration.getHaServices())
.build();
miniCluster = new MiniCluster(miniClusterConfiguration);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
index d54996d..5ec20a9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.testutils;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.util.Preconditions;
@@ -40,17 +43,22 @@ public class MiniClusterResourceConfiguration {
private final RpcServiceSharing rpcServiceSharing;
+ private final MiniCluster.HaServices haServices;
+
protected MiniClusterResourceConfiguration(
Configuration configuration,
int numberTaskManagers,
int numberSlotsPerTaskManager,
Time shutdownTimeout,
- RpcServiceSharing rpcServiceSharing) {
+ RpcServiceSharing rpcServiceSharing,
+ MiniCluster.HaServices haServices) {
+
this.configuration = new
UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
this.numberTaskManagers = numberTaskManagers;
this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
this.shutdownTimeout =
Preconditions.checkNotNull(shutdownTimeout);
this.rpcServiceSharing =
Preconditions.checkNotNull(rpcServiceSharing);
+ this.haServices = haServices;
}
public Configuration getConfiguration() {
@@ -73,6 +81,10 @@ public class MiniClusterResourceConfiguration {
return rpcServiceSharing;
}
+ public MiniCluster.HaServices getHaServices() {
+ return haServices;
+ }
+
/**
* Builder for {@link MiniClusterResourceConfiguration}.
*/
@@ -84,6 +96,7 @@ public class MiniClusterResourceConfiguration {
private Time shutdownTimeout =
AkkaUtils.getTimeoutAsTime(configuration);
private RpcServiceSharing rpcServiceSharing =
RpcServiceSharing.SHARED;
+ private MiniCluster.HaServices haServices =
MiniCluster.HaServices.CONFIGURED;
public Builder setConfiguration(Configuration configuration) {
this.configuration = configuration;
@@ -110,8 +123,25 @@ public class MiniClusterResourceConfiguration {
return this;
}
+ /**
+ * Enables or disables {@link HaLeadershipControl} in {@link
MiniCluster#getHaLeadershipControl}.
+ *
+ * <p>{@link HaLeadershipControl} allows granting and revoking
leadership of HA components.
+ * Enabling this feature disables {@link
HighAvailabilityOptions#HA_MODE} option.
+ */
+ public Builder withHaLeadershipControl() {
+ this.haServices =
MiniCluster.HaServices.WITH_LEADERSHIP_CONTROL;
+ return this;
+ }
+
public MiniClusterResourceConfiguration build() {
- return new
MiniClusterResourceConfiguration(configuration, numberTaskManagers,
numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing);
+ return new MiniClusterResourceConfiguration(
+ configuration,
+ numberTaskManagers,
+ numberSlotsPerTaskManager,
+ shutdownTimeout,
+ rpcServiceSharing,
+ haServices);
}
}
}
diff --git
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
index 8b5167a..07b45a8 100644
---
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
+++
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.util;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
/**
@@ -38,7 +39,7 @@ public class MiniClusterResourceConfiguration extends
org.apache.flink.runtime.t
int numberSlotsPerTaskManager,
Time shutdownTimeout,
RpcServiceSharing rpcServiceSharing) {
- super(configuration, numberTaskManagers,
numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing);
+ super(configuration, numberTaskManagers,
numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing,
MiniCluster.HaServices.CONFIGURED);
}
/**
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
index a850dc3..10eddb3 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
@@ -38,9 +38,9 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
@@ -448,7 +448,7 @@ public class NotifyCheckpointAbortedITCase extends
TestLogger {
@Override
public HighAvailabilityServices createHAServices(Configuration
configuration, Executor executor) {
return new TestingHaServices(
- new TestingCheckpointRecoveryFactory(new
TestingCompletedCheckpointStore(), new StandaloneCheckpointIDCounter()),
+
PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(new
TestingCompletedCheckpointStore(), new StandaloneCheckpointIDCounter()),
executor);
}
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index 976b895..918105b 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -34,9 +34,9 @@ import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
import
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
@@ -445,7 +445,7 @@ public class RegionFailoverITCase extends TestLogger {
@Override
public HighAvailabilityServices createHAServices(Configuration
configuration, Executor executor) {
return new TestingHaServices(
- new TestingCheckpointRecoveryFactory(new
TestingCompletedCheckpointStore(), new StandaloneCheckpointIDCounter()),
+
PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(new
TestingCompletedCheckpointStore(), new StandaloneCheckpointIDCounter()),
executor);
}
}