tillrohrmann commented on a change in pull request #14301:
URL: https://github.com/apache/flink/pull/14301#discussion_r536125712
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
##########
@@ -29,13 +30,13 @@
* Simple {@link CheckpointRecoveryFactory} which creates a
* {@link CompletedCheckpointStore} and a {@link CheckpointIDCounter} per
{@link JobID}.
*/
-public class TestingCheckpointRecoveryFactory implements
CheckpointRecoveryFactory {
+public class PerJobCheckpointRecoveryFactory implements
CheckpointRecoveryFactory {
Review comment:
Why did you call it `PerJobCheckpointRecoveryFactory`? Is it because it
stores for different jobs the services? To me it sounds that this factory only
works for a single job.
##########
File path:
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceTextLinesITCase.java
##########
@@ -76,46 +68,15 @@
@ClassRule
public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
- private static TestingMiniCluster miniCluster;
-
- private static EmbeddedHaServicesWithLeadershipControl
highAvailabilityServices;
-
- @BeforeClass
- public static void setupMiniCluster() throws Exception {
- highAvailabilityServices =
- new
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
-
- final Configuration configuration = createConfiguration();
-
- miniCluster = new TestingMiniCluster(
- new TestingMiniClusterConfiguration.Builder()
- .setConfiguration(configuration)
- .setNumTaskManagers(1)
- .setNumSlotsPerTaskManager(PARALLELISM)
-
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
- .build(),
- () -> highAvailabilityServices);
-
- miniCluster.start();
- }
-
- private static Configuration createConfiguration() throws IOException {
- final Configuration configuration = new Configuration();
- final String checkPointDir =
Path.fromLocalFile(TMP_FOLDER.newFolder()).toUri().toString();
- configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkPointDir);
- return configuration;
- }
-
- @AfterClass
- public static void shutdownMiniCluster() throws Exception {
- if (miniCluster != null) {
- miniCluster.close();
- }
- if (highAvailabilityServices != null) {
- highAvailabilityServices.closeAndCleanupAllData();
- highAvailabilityServices = null;
- }
- }
+ @ClassRule
+ public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE
= new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration
+ .Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(PARALLELISM)
+ .setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+ .enableEmbeddedHaLeadershipControl()
Review comment:
maybe call `withHaLeadershipControl()`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -938,6 +962,13 @@ private void terminateMiniClusterServices() throws
Exception {
}
}
+ @Nullable
+ private static BiFunction<Configuration, Executor,
HighAvailabilityServices> createHighAvailabilityServicesFactory(
+ boolean enableEmbeddedHaLeadershipControl) {
+ return enableEmbeddedHaLeadershipControl ?
+ (conf, executor) -> new
EmbeddedHaServicesWithLeadershipControl(executor) : null;
+ }
Review comment:
For what do we need this method here?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
##########
@@ -176,6 +184,11 @@ public Builder setCommonBindAddress(String
commonBindAddress) {
return this;
}
+ public Builder enableEmbeddedHaLeadershipControl(boolean
enableEmbeddedHaLeadershipControl) {
Review comment:
maybe rename into `withHaLeadershipControl` and add a description that
this overrides the HA config option.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
##########
@@ -20,27 +20,47 @@
import org.apache.flink.api.common.JobID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
/**
- * Simple {@link CheckpointRecoveryFactory} which is initialized with a
- * {@link CompletedCheckpointStore} and a {@link CheckpointIDCounter}.
+ * Simple {@link CheckpointRecoveryFactory} which creates a
+ * {@link CompletedCheckpointStore} and a {@link CheckpointIDCounter} per
{@link JobID}.
*/
public class TestingCheckpointRecoveryFactory implements
CheckpointRecoveryFactory {
+ private final Function<Integer, CompletedCheckpointStore>
completedCheckpointStorePerJobFactory;
+ private final Supplier<CheckpointIDCounter>
checkpointIDCounterPerJobFactory;
+ private final Map<JobID, CompletedCheckpointStore> store;
+ private final Map<JobID, CheckpointIDCounter> counter;
- private final CompletedCheckpointStore store;
- private final CheckpointIDCounter counter;
-
- public TestingCheckpointRecoveryFactory(CompletedCheckpointStore store,
CheckpointIDCounter counter) {
- this.store = store;
- this.counter = counter;
+ public TestingCheckpointRecoveryFactory(
+ Function<Integer, CompletedCheckpointStore>
completedCheckpointStorePerJobFactory,
+ Supplier<CheckpointIDCounter>
checkpointIDCounterPerJobFactory) {
+ this.completedCheckpointStorePerJobFactory =
completedCheckpointStorePerJobFactory;
+ this.checkpointIDCounterPerJobFactory =
checkpointIDCounterPerJobFactory;
+ this.store = new HashMap<>();
+ this.counter = new HashMap<>();
}
@Override
- public CompletedCheckpointStore createCheckpointStore(JobID jobId, int
maxNumberOfCheckpointsToRetain, ClassLoader userClassLoader) throws Exception {
- return store;
+ public CompletedCheckpointStore createCheckpointStore(
+ JobID jobId,
+ int maxNumberOfCheckpointsToRetain,
+ ClassLoader userClassLoader) {
+ return store.computeIfAbsent(jobId, jId ->
+
completedCheckpointStorePerJobFactory.apply(maxNumberOfCheckpointsToRetain));
}
@Override
- public CheckpointIDCounter createCheckpointIDCounter(JobID jobId)
throws Exception {
- return counter;
+ public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) {
+ return counter.computeIfAbsent(jobId, jId ->
checkpointIDCounterPerJobFactory.get());
+ }
+
+ public static CheckpointRecoveryFactory createSamePerJob(
Review comment:
Maybe rename into `useSameServicesForAllJobs()`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
##########
@@ -54,12 +56,13 @@ public MiniClusterConfiguration(
Configuration configuration,
int numTaskManagers,
RpcServiceSharing rpcServiceSharing,
- @Nullable String commonBindAddress) {
-
+ @Nullable String commonBindAddress,
+ boolean enableEmbeddedHaLeadershipControl) {
Review comment:
I am always a bit more in favour of passing enum instead of booleans
because enum values are more descriptive than `true` and `false`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -428,11 +434,29 @@ DispatcherResourceManagerComponentFactory
createDispatcherResourceManagerCompone
}
@VisibleForTesting
- protected HighAvailabilityServices
createHighAvailabilityServices(Configuration configuration, Executor executor)
throws Exception {
+ protected HighAvailabilityServices createHighAvailabilityServices(
+ Configuration configuration,
+ Executor executor) throws Exception {
LOG.info("Starting high-availability services");
- return
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
- configuration,
- executor);
+ return
miniClusterConfiguration.embeddedHaLeadershipControlEnabled() ?
+ new EmbeddedHaServicesWithLeadershipControl(executor) :
+
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration,
executor);
+ }
+
+ /**
+ * Returns {@link HaLeadershipControl} if enabled.
+ *
+ * <p>{@link HaLeadershipControl} allows granting and revoking
leadership of HA components,
+ * e.g. JobManager. The method return {@link Optional#empty()} if the
control is not enabled in
+ * {@link MiniClusterConfiguration}.
+ *
+ * <p>Enabling this feature disables {@link
HighAvailabilityOptions#HA_MODE} option.
Review comment:
This paragraph should be added to `enableHaLeadershipControl` method on
the builder, I guess.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]