dmvk commented on a change in pull request #18296:
URL: https://github.com/apache/flink/pull/18296#discussion_r794265194



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import 
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
+import static org.junit.Assert.assertNotNull;
+
+/** An integration test which recovers from checkpoint after regaining the 
leadership. */
+public class JobDispatcherITCase extends TestLogger {
+
+    private static final Duration TIMEOUT = Duration.ofMinutes(10);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    private Supplier<DispatcherResourceManagerComponentFactory>
+            createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                    Configuration configuration) {
+        return () -> {
+            try {
+                return new DefaultDispatcherResourceManagerComponentFactory(
+                        new DefaultDispatcherRunnerFactory(
+                                
JobDispatcherLeaderProcessFactoryFactory.create(
+                                        
FileJobGraphRetriever.createFrom(configuration, null))),
+                        StandaloneResourceManagerFactory.getInstance(),
+                        JobRestEndpointFactory.INSTANCE);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+
+    @Test
+    public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() 
throws Exception {
+        final Deadline deadline = Deadline.fromNow(TIMEOUT);
+        final Configuration configuration = new Configuration();
+        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        final TestingMiniClusterConfiguration clusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder()
+                        .setConfiguration(configuration)
+                        .build();
+        final EmbeddedHaServicesWithLeadershipControl haServices =
+                new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
+
+        Configuration newConfiguration = new 
Configuration(clusterConfiguration.getConfiguration());
+        long checkpointInterval = 500;
+        JobID jobID = generateJobGraph(newConfiguration, checkpointInterval);
+
+        final TestingMiniCluster.Builder clusterBuilder =
+                TestingMiniCluster.newBuilder(clusterConfiguration)
+                        .setHighAvailabilityServicesSupplier(() -> haServices)
+                        .setDispatcherResourceManagerComponentFactorySupplier(
+                                
createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                                        newConfiguration));
+
+        try (final MiniCluster cluster = clusterBuilder.build()) {
+            // start mini cluster and submit the job
+            cluster.start();
+
+            // wait until job is running
+            awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
+            CommonTestUtils.waitForAllTaskRunning(cluster, jobID, false);
+            CommonTestUtils.waitUntilCondition(
+                    () -> queryCompletedCheckpoints(cluster, jobID) > 0L,
+                    Deadline.fromNow(Duration.ofSeconds(30)),
+                    checkpointInterval / 2);
+
+            final CompletableFuture<JobResult> firstJobResult = 
cluster.requestJobResult(jobID);
+            haServices.revokeDispatcherLeadership();
+            // make sure the leadership is revoked to avoid race conditions
+            Assertions.assertEquals(
+                    ApplicationStatus.UNKNOWN, 
firstJobResult.get().getApplicationStatus());
+
+            haServices.grantDispatcherLeadership();
+
+            // job is suspended, wait until it's running
+            awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
+
+            assertNotNull(
+                    cluster.getArchivedExecutionGraph(jobID)
+                            .get()
+                            .getCheckpointStatsSnapshot()
+                            .getLatestRestoredCheckpoint());
+
+            cluster.cancelJob(jobID);
+
+            // the cluster should shut down automatically once the application 
completes
+            CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), 
deadline);
+        }
+    }
+
+    private JobID generateJobGraph(Configuration configuration, long 
checkpointInterval)
+            throws Exception {
+        final JobVertex jobVertex = new JobVertex("jobVertex");
+        jobVertex.setInvokableClass(
+                
AdaptiveSchedulerClusterITCase.CheckpointingNoOpInvokable.class);
+        jobVertex.setParallelism(1);
+
+        final CheckpointCoordinatorConfiguration 
checkpointCoordinatorConfiguration =
+                CheckpointCoordinatorConfiguration.builder()
+                        .setCheckpointInterval(checkpointInterval)
+                        .build();
+        final JobCheckpointingSettings checkpointingSettings =
+                new 
JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
+        JobGraph jobGraph =

Review comment:
       nit: all other fields are marked final
   ```suggestion
           final JobGraph jobGraph =
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
##########
@@ -201,6 +203,37 @@ public void testTerminationAfterJobCompletion() throws 
Exception {
         }
     }
 
+    /**
+     * Tests that in detached mode, the {@link MiniDispatcher} will not 
complete the future that
+     * signals job termination if the JobStatus is not globally terminal state.
+     */
+    @Test
+    public void testNotTerminationWithoutGloballyTerminalState() throws 
Exception {
+        final MiniDispatcher miniDispatcher =
+                createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
+        miniDispatcher.start();
+
+        try {
+            // wait until we have submitted the job
+            final TestingJobManagerRunner testingJobManagerRunner =
+                    
testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+
+            testingJobManagerRunner.completeResultFuture(
+                    new ExecutionGraphInfo(
+                            new ArchivedExecutionGraphBuilder()
+                                    .setJobID(jobGraph.getJobID())
+                                    .setState(JobStatus.SUSPENDED)
+                                    .build()));
+
+            miniDispatcher.getShutDownFuture().get(3, TimeUnit.SECONDS);
+            fail("The shutDownFuture should not be done.");
+        } catch (TimeoutException ignored) {
+

Review comment:
       We can speed this test up by assuming that the shutdown future would 
have been completed before JMR terminates (in failing scenario)
   
   ```suggestion
               testingJobManagerRunner.getTerminationFuture().get();
               
Assertions.assertThat(miniDispatcher.getShutDownFuture()).isNotDone();
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import 
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
+import static org.junit.Assert.assertNotNull;
+
+/** An integration test which recovers from checkpoint after regaining the 
leadership. */
+public class JobDispatcherITCase extends TestLogger {
+
+    private static final Duration TIMEOUT = Duration.ofMinutes(10);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    private Supplier<DispatcherResourceManagerComponentFactory>
+            createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                    Configuration configuration) {
+        return () -> {
+            try {
+                return new DefaultDispatcherResourceManagerComponentFactory(
+                        new DefaultDispatcherRunnerFactory(
+                                
JobDispatcherLeaderProcessFactoryFactory.create(
+                                        
FileJobGraphRetriever.createFrom(configuration, null))),
+                        StandaloneResourceManagerFactory.getInstance(),
+                        JobRestEndpointFactory.INSTANCE);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+
+    @Test
+    public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() 
throws Exception {
+        final Deadline deadline = Deadline.fromNow(TIMEOUT);
+        final Configuration configuration = new Configuration();
+        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        final TestingMiniClusterConfiguration clusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder()
+                        .setConfiguration(configuration)
+                        .build();
+        final EmbeddedHaServicesWithLeadershipControl haServices =
+                new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
+
+        Configuration newConfiguration = new 
Configuration(clusterConfiguration.getConfiguration());
+        long checkpointInterval = 500;
+        JobID jobID = generateJobGraph(newConfiguration, checkpointInterval);

Review comment:
       nit: finals + lower the checkpoint interval
   ```suggestion
           final Configuration newConfiguration = new 
Configuration(clusterConfiguration.getConfiguration());
           final long checkpointInterval = 100;
           final JobID jobID = generateJobGraph(newConfiguration, 
checkpointInterval);
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import 
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
+import static org.junit.Assert.assertNotNull;
+
+/** An integration test which recovers from checkpoint after regaining the 
leadership. */
+public class JobDispatcherITCase extends TestLogger {
+
+    private static final Duration TIMEOUT = Duration.ofMinutes(10);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    private Supplier<DispatcherResourceManagerComponentFactory>
+            createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                    Configuration configuration) {
+        return () -> {
+            try {
+                return new DefaultDispatcherResourceManagerComponentFactory(
+                        new DefaultDispatcherRunnerFactory(
+                                
JobDispatcherLeaderProcessFactoryFactory.create(
+                                        
FileJobGraphRetriever.createFrom(configuration, null))),
+                        StandaloneResourceManagerFactory.getInstance(),
+                        JobRestEndpointFactory.INSTANCE);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+
+    @Test
+    public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() 
throws Exception {
+        final Deadline deadline = Deadline.fromNow(TIMEOUT);
+        final Configuration configuration = new Configuration();
+        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        final TestingMiniClusterConfiguration clusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder()
+                        .setConfiguration(configuration)
+                        .build();
+        final EmbeddedHaServicesWithLeadershipControl haServices =
+                new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
+
+        Configuration newConfiguration = new 
Configuration(clusterConfiguration.getConfiguration());
+        long checkpointInterval = 500;
+        JobID jobID = generateJobGraph(newConfiguration, checkpointInterval);
+
+        final TestingMiniCluster.Builder clusterBuilder =
+                TestingMiniCluster.newBuilder(clusterConfiguration)
+                        .setHighAvailabilityServicesSupplier(() -> haServices)
+                        .setDispatcherResourceManagerComponentFactorySupplier(
+                                
createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                                        newConfiguration));
+
+        try (final MiniCluster cluster = clusterBuilder.build()) {
+            // start mini cluster and submit the job
+            cluster.start();
+
+            // wait until job is running
+            awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
+            CommonTestUtils.waitForAllTaskRunning(cluster, jobID, false);
+            CommonTestUtils.waitUntilCondition(
+                    () -> queryCompletedCheckpoints(cluster, jobID) > 0L,
+                    Deadline.fromNow(Duration.ofSeconds(30)),
+                    checkpointInterval / 2);
+
+            final CompletableFuture<JobResult> firstJobResult = 
cluster.requestJobResult(jobID);
+            haServices.revokeDispatcherLeadership();
+            // make sure the leadership is revoked to avoid race conditions
+            Assertions.assertEquals(
+                    ApplicationStatus.UNKNOWN, 
firstJobResult.get().getApplicationStatus());
+
+            haServices.grantDispatcherLeadership();
+
+            // job is suspended, wait until it's running
+            awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
+
+            assertNotNull(
+                    cluster.getArchivedExecutionGraph(jobID)
+                            .get()
+                            .getCheckpointStatsSnapshot()
+                            .getLatestRestoredCheckpoint());
+
+            cluster.cancelJob(jobID);
+
+            // the cluster should shut down automatically once the application 
completes
+            CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), 
deadline);
+        }
+    }
+
+    private JobID generateJobGraph(Configuration configuration, long 
checkpointInterval)

Review comment:
       I really like this! Maybe a small suggestion on the naming to make the 
fact that we write the JG to the disk more explicit
   
   ```suggestion
       private JobID generateAndPersistJobGraph(Configuration configuration, 
long checkpointInterval)
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import 
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
+import static org.junit.Assert.assertNotNull;
+
+/** An integration test which recovers from checkpoint after regaining the 
leadership. */
+public class JobDispatcherITCase extends TestLogger {

Review comment:
       nit: There was an agreement by the community that junit5 should be used 
for the new tests. For this test it basically only means changing the test 
logger for extension, switching imports for `@Test` annotation and injecting 
temp directory as a parameter to the test case.
   
   ```suggestion
   @ExtendWith(TestLoggerExtension.class)
   public class JobDispatcherITCase {
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import 
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
+import static org.junit.Assert.assertNotNull;
+
+/** An integration test which recovers from checkpoint after regaining the 
leadership. */
+public class JobDispatcherITCase extends TestLogger {
+
+    private static final Duration TIMEOUT = Duration.ofMinutes(10);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    private Supplier<DispatcherResourceManagerComponentFactory>
+            createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                    Configuration configuration) {
+        return () -> {
+            try {
+                return new DefaultDispatcherResourceManagerComponentFactory(
+                        new DefaultDispatcherRunnerFactory(
+                                
JobDispatcherLeaderProcessFactoryFactory.create(
+                                        
FileJobGraphRetriever.createFrom(configuration, null))),
+                        StandaloneResourceManagerFactory.getInstance(),
+                        JobRestEndpointFactory.INSTANCE);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+
+    @Test
+    public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() 
throws Exception {
+        final Deadline deadline = Deadline.fromNow(TIMEOUT);
+        final Configuration configuration = new Configuration();
+        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        final TestingMiniClusterConfiguration clusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder()
+                        .setConfiguration(configuration)
+                        .build();
+        final EmbeddedHaServicesWithLeadershipControl haServices =
+                new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
+
+        Configuration newConfiguration = new 
Configuration(clusterConfiguration.getConfiguration());
+        long checkpointInterval = 500;
+        JobID jobID = generateJobGraph(newConfiguration, checkpointInterval);
+
+        final TestingMiniCluster.Builder clusterBuilder =
+                TestingMiniCluster.newBuilder(clusterConfiguration)
+                        .setHighAvailabilityServicesSupplier(() -> haServices)
+                        .setDispatcherResourceManagerComponentFactorySupplier(
+                                
createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                                        newConfiguration));
+
+        try (final MiniCluster cluster = clusterBuilder.build()) {
+            // start mini cluster and submit the job
+            cluster.start();
+
+            // wait until job is running
+            awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
+            CommonTestUtils.waitForAllTaskRunning(cluster, jobID, false);
+            CommonTestUtils.waitUntilCondition(
+                    () -> queryCompletedCheckpoints(cluster, jobID) > 0L,
+                    Deadline.fromNow(Duration.ofSeconds(30)),
+                    checkpointInterval / 2);
+
+            final CompletableFuture<JobResult> firstJobResult = 
cluster.requestJobResult(jobID);
+            haServices.revokeDispatcherLeadership();
+            // make sure the leadership is revoked to avoid race conditions
+            Assertions.assertEquals(
+                    ApplicationStatus.UNKNOWN, 
firstJobResult.get().getApplicationStatus());
+
+            haServices.grantDispatcherLeadership();
+
+            // job is suspended, wait until it's running
+            awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
+
+            assertNotNull(
+                    cluster.getArchivedExecutionGraph(jobID)
+                            .get()
+                            .getCheckpointStatsSnapshot()
+                            .getLatestRestoredCheckpoint());
+
+            cluster.cancelJob(jobID);
+
+            // the cluster should shut down automatically once the application 
completes
+            CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), 
deadline);

Review comment:
       Do we need to test for this (it feels that we've already tested 
everything we needed to)? If not we can remove it to speed the test up a little 
bit.
   
   ```suggestion
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import 
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
+import static org.junit.Assert.assertNotNull;
+
+/** An integration test which recovers from checkpoint after regaining the 
leadership. */
+public class JobDispatcherITCase extends TestLogger {
+
+    private static final Duration TIMEOUT = Duration.ofMinutes(10);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    private Supplier<DispatcherResourceManagerComponentFactory>
+            createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                    Configuration configuration) {
+        return () -> {
+            try {
+                return new DefaultDispatcherResourceManagerComponentFactory(
+                        new DefaultDispatcherRunnerFactory(
+                                
JobDispatcherLeaderProcessFactoryFactory.create(
+                                        
FileJobGraphRetriever.createFrom(configuration, null))),
+                        StandaloneResourceManagerFactory.getInstance(),
+                        JobRestEndpointFactory.INSTANCE);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+
+    @Test
+    public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() 
throws Exception {
+        final Deadline deadline = Deadline.fromNow(TIMEOUT);
+        final Configuration configuration = new Configuration();
+        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        final TestingMiniClusterConfiguration clusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder()
+                        .setConfiguration(configuration)
+                        .build();
+        final EmbeddedHaServicesWithLeadershipControl haServices =
+                new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
+
+        Configuration newConfiguration = new 
Configuration(clusterConfiguration.getConfiguration());
+        long checkpointInterval = 500;
+        JobID jobID = generateJobGraph(newConfiguration, checkpointInterval);
+
+        final TestingMiniCluster.Builder clusterBuilder =
+                TestingMiniCluster.newBuilder(clusterConfiguration)
+                        .setHighAvailabilityServicesSupplier(() -> haServices)
+                        .setDispatcherResourceManagerComponentFactorySupplier(
+                                
createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                                        newConfiguration));
+

Review comment:
       (followup for the comment below)
   ```suggestion
   AtLeastOneCheckpointInvokable.reset()
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
##########
@@ -177,6 +177,34 @@ public void testTerminationAfterJobCompletion() throws 
Exception {
         }
     }
 
+    /**
+     * Tests that in detached mode, the {@link MiniDispatcher} will not 
complete the future that
+     * signals job termination if the JobStatus is not globally terminal state.
+     */
+    @Test
+    public void testNotTerminationWithoutGloballyTerminalState() throws 
Exception {

Review comment:
       👍 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import 
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
+import static org.junit.Assert.assertNotNull;
+
+/** An integration test which recovers from checkpoint after regaining the 
leadership. */
+public class JobDispatcherITCase extends TestLogger {
+
+    private static final Duration TIMEOUT = Duration.ofMinutes(10);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    private Supplier<DispatcherResourceManagerComponentFactory>
+            createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                    Configuration configuration) {
+        return () -> {
+            try {
+                return new DefaultDispatcherResourceManagerComponentFactory(
+                        new DefaultDispatcherRunnerFactory(
+                                
JobDispatcherLeaderProcessFactoryFactory.create(
+                                        
FileJobGraphRetriever.createFrom(configuration, null))),
+                        StandaloneResourceManagerFactory.getInstance(),
+                        JobRestEndpointFactory.INSTANCE);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+
+    @Test
+    public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() 
throws Exception {
+        final Deadline deadline = Deadline.fromNow(TIMEOUT);
+        final Configuration configuration = new Configuration();
+        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        final TestingMiniClusterConfiguration clusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder()
+                        .setConfiguration(configuration)
+                        .build();
+        final EmbeddedHaServicesWithLeadershipControl haServices =
+                new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
+
+        Configuration newConfiguration = new 
Configuration(clusterConfiguration.getConfiguration());
+        long checkpointInterval = 500;
+        JobID jobID = generateJobGraph(newConfiguration, checkpointInterval);
+
+        final TestingMiniCluster.Builder clusterBuilder =
+                TestingMiniCluster.newBuilder(clusterConfiguration)
+                        .setHighAvailabilityServicesSupplier(() -> haServices)
+                        .setDispatcherResourceManagerComponentFactorySupplier(
+                                
createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                                        newConfiguration));
+
+        try (final MiniCluster cluster = clusterBuilder.build()) {
+            // start mini cluster and submit the job
+            cluster.start();
+
+            // wait until job is running
+            awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
+            CommonTestUtils.waitForAllTaskRunning(cluster, jobID, false);
+            CommonTestUtils.waitUntilCondition(
+                    () -> queryCompletedCheckpoints(cluster, jobID) > 0L,
+                    Deadline.fromNow(Duration.ofSeconds(30)),
+                    checkpointInterval / 2);

Review comment:
       ```suggestion
               
AtLeastOneCheckpointInvokable.AT_LEAST_ONE_CHECKPOINT_COMPLETED.await();
   ```
   
   I think we can simplify this a bit and get rid of few busy waiting loops by 
introducing a custom invokable, that waits for at least on checkpoint to 
complete.
   
   ```
       /** An invokable that supports checkpointing and counts down when there 
is at least one checkpoint. */
       public static class AtLeastOneCheckpointInvokable extends 
AdaptiveSchedulerClusterITCase.CheckpointingNoOpInvokable {
   
           private static volatile CountDownLatch 
AT_LEAST_ONE_CHECKPOINT_COMPLETED;
   
           private static void reset() {
               AT_LEAST_ONE_CHECKPOINT_COMPLETED = new CountDownLatch(1);
           }
   
           public AtLeastOneCheckpointInvokable(Environment environment) {
               super(environment);
           }
   
           @Override
           public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) 
{
               AT_LEAST_ONE_CHECKPOINT_COMPLETED.countDown();
               return CompletableFuture.completedFuture(null);
           }
       }
    ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import 
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import 
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
+import static org.junit.Assert.assertNotNull;
+
+/** An integration test which recovers from checkpoint after regaining the 
leadership. */
+public class JobDispatcherITCase extends TestLogger {
+
+    private static final Duration TIMEOUT = Duration.ofMinutes(10);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    private Supplier<DispatcherResourceManagerComponentFactory>
+            createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                    Configuration configuration) {
+        return () -> {
+            try {
+                return new DefaultDispatcherResourceManagerComponentFactory(
+                        new DefaultDispatcherRunnerFactory(
+                                
JobDispatcherLeaderProcessFactoryFactory.create(
+                                        
FileJobGraphRetriever.createFrom(configuration, null))),
+                        StandaloneResourceManagerFactory.getInstance(),
+                        JobRestEndpointFactory.INSTANCE);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+
+    @Test
+    public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() 
throws Exception {

Review comment:
       nit: follow up for the junit 5 comment
   ```suggestion
       public void 
testRecoverFromCheckpointAfterLosingAndRegainingLeadership(@TempDir Path tmp) 
throws Exception {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to