dmvk commented on a change in pull request #18296: URL: https://github.com/apache/flink/pull/18296#discussion_r794265194
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; +import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; +import org.apache.flink.runtime.rest.JobRestEndpointFactory; +import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import static java.nio.file.StandardOpenOption.CREATE; +import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; +import static org.junit.Assert.assertNotNull; + +/** An integration test which recovers from checkpoint after regaining the leadership. */ +public class JobDispatcherITCase extends TestLogger { + + private static final Duration TIMEOUT = Duration.ofMinutes(10); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private Supplier<DispatcherResourceManagerComponentFactory> + createJobModeDispatcherResourceManagerComponentFactorySupplier( + Configuration configuration) { + return () -> { + try { + return new DefaultDispatcherResourceManagerComponentFactory( + new DefaultDispatcherRunnerFactory( + JobDispatcherLeaderProcessFactoryFactory.create( + FileJobGraphRetriever.createFrom(configuration, null))), + StandaloneResourceManagerFactory.getInstance(), + JobRestEndpointFactory.INSTANCE); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + @Test + public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() throws Exception { + final Deadline deadline = Deadline.fromNow(TIMEOUT); + final Configuration configuration = new Configuration(); + configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); + final TestingMiniClusterConfiguration clusterConfiguration = + TestingMiniClusterConfiguration.newBuilder() + .setConfiguration(configuration) + .build(); + final EmbeddedHaServicesWithLeadershipControl haServices = + new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()); + + Configuration newConfiguration = new Configuration(clusterConfiguration.getConfiguration()); + long checkpointInterval = 500; + JobID jobID = generateJobGraph(newConfiguration, checkpointInterval); + + final TestingMiniCluster.Builder clusterBuilder = + TestingMiniCluster.newBuilder(clusterConfiguration) + .setHighAvailabilityServicesSupplier(() -> haServices) + .setDispatcherResourceManagerComponentFactorySupplier( + createJobModeDispatcherResourceManagerComponentFactorySupplier( + newConfiguration)); + + try (final MiniCluster cluster = clusterBuilder.build()) { + // start mini cluster and submit the job + cluster.start(); + + // wait until job is running + awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline); + CommonTestUtils.waitForAllTaskRunning(cluster, jobID, false); + CommonTestUtils.waitUntilCondition( + () -> queryCompletedCheckpoints(cluster, jobID) > 0L, + Deadline.fromNow(Duration.ofSeconds(30)), + checkpointInterval / 2); + + final CompletableFuture<JobResult> firstJobResult = cluster.requestJobResult(jobID); + haServices.revokeDispatcherLeadership(); + // make sure the leadership is revoked to avoid race conditions + Assertions.assertEquals( + ApplicationStatus.UNKNOWN, firstJobResult.get().getApplicationStatus()); + + haServices.grantDispatcherLeadership(); + + // job is suspended, wait until it's running + awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline); + + assertNotNull( + cluster.getArchivedExecutionGraph(jobID) + .get() + .getCheckpointStatsSnapshot() + .getLatestRestoredCheckpoint()); + + cluster.cancelJob(jobID); + + // the cluster should shut down automatically once the application completes + CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), deadline); + } + } + + private JobID generateJobGraph(Configuration configuration, long checkpointInterval) + throws Exception { + final JobVertex jobVertex = new JobVertex("jobVertex"); + jobVertex.setInvokableClass( + AdaptiveSchedulerClusterITCase.CheckpointingNoOpInvokable.class); + jobVertex.setParallelism(1); + + final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = + CheckpointCoordinatorConfiguration.builder() + .setCheckpointInterval(checkpointInterval) + .build(); + final JobCheckpointingSettings checkpointingSettings = + new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null); + JobGraph jobGraph = Review comment: nit: all other fields are marked final ```suggestion final JobGraph jobGraph = ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java ########## @@ -201,6 +203,37 @@ public void testTerminationAfterJobCompletion() throws Exception { } } + /** + * Tests that in detached mode, the {@link MiniDispatcher} will not complete the future that + * signals job termination if the JobStatus is not globally terminal state. + */ + @Test + public void testNotTerminationWithoutGloballyTerminalState() throws Exception { + final MiniDispatcher miniDispatcher = + createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED); + miniDispatcher.start(); + + try { + // wait until we have submitted the job + final TestingJobManagerRunner testingJobManagerRunner = + testingJobManagerRunnerFactory.takeCreatedJobManagerRunner(); + + testingJobManagerRunner.completeResultFuture( + new ExecutionGraphInfo( + new ArchivedExecutionGraphBuilder() + .setJobID(jobGraph.getJobID()) + .setState(JobStatus.SUSPENDED) + .build())); + + miniDispatcher.getShutDownFuture().get(3, TimeUnit.SECONDS); + fail("The shutDownFuture should not be done."); + } catch (TimeoutException ignored) { + Review comment: We can speed this test up by assuming that the shutdown future would have been completed before JMR terminates (in failing scenario) ```suggestion testingJobManagerRunner.getTerminationFuture().get(); Assertions.assertThat(miniDispatcher.getShutDownFuture()).isNotDone(); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; +import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; +import org.apache.flink.runtime.rest.JobRestEndpointFactory; +import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import static java.nio.file.StandardOpenOption.CREATE; +import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; +import static org.junit.Assert.assertNotNull; + +/** An integration test which recovers from checkpoint after regaining the leadership. */ +public class JobDispatcherITCase extends TestLogger { + + private static final Duration TIMEOUT = Duration.ofMinutes(10); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private Supplier<DispatcherResourceManagerComponentFactory> + createJobModeDispatcherResourceManagerComponentFactorySupplier( + Configuration configuration) { + return () -> { + try { + return new DefaultDispatcherResourceManagerComponentFactory( + new DefaultDispatcherRunnerFactory( + JobDispatcherLeaderProcessFactoryFactory.create( + FileJobGraphRetriever.createFrom(configuration, null))), + StandaloneResourceManagerFactory.getInstance(), + JobRestEndpointFactory.INSTANCE); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + @Test + public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() throws Exception { + final Deadline deadline = Deadline.fromNow(TIMEOUT); + final Configuration configuration = new Configuration(); + configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); + final TestingMiniClusterConfiguration clusterConfiguration = + TestingMiniClusterConfiguration.newBuilder() + .setConfiguration(configuration) + .build(); + final EmbeddedHaServicesWithLeadershipControl haServices = + new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()); + + Configuration newConfiguration = new Configuration(clusterConfiguration.getConfiguration()); + long checkpointInterval = 500; + JobID jobID = generateJobGraph(newConfiguration, checkpointInterval); Review comment: nit: finals + lower the checkpoint interval ```suggestion final Configuration newConfiguration = new Configuration(clusterConfiguration.getConfiguration()); final long checkpointInterval = 100; final JobID jobID = generateJobGraph(newConfiguration, checkpointInterval); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; +import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; +import org.apache.flink.runtime.rest.JobRestEndpointFactory; +import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import static java.nio.file.StandardOpenOption.CREATE; +import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; +import static org.junit.Assert.assertNotNull; + +/** An integration test which recovers from checkpoint after regaining the leadership. */ +public class JobDispatcherITCase extends TestLogger { + + private static final Duration TIMEOUT = Duration.ofMinutes(10); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private Supplier<DispatcherResourceManagerComponentFactory> + createJobModeDispatcherResourceManagerComponentFactorySupplier( + Configuration configuration) { + return () -> { + try { + return new DefaultDispatcherResourceManagerComponentFactory( + new DefaultDispatcherRunnerFactory( + JobDispatcherLeaderProcessFactoryFactory.create( + FileJobGraphRetriever.createFrom(configuration, null))), + StandaloneResourceManagerFactory.getInstance(), + JobRestEndpointFactory.INSTANCE); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + @Test + public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() throws Exception { + final Deadline deadline = Deadline.fromNow(TIMEOUT); + final Configuration configuration = new Configuration(); + configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); + final TestingMiniClusterConfiguration clusterConfiguration = + TestingMiniClusterConfiguration.newBuilder() + .setConfiguration(configuration) + .build(); + final EmbeddedHaServicesWithLeadershipControl haServices = + new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()); + + Configuration newConfiguration = new Configuration(clusterConfiguration.getConfiguration()); + long checkpointInterval = 500; + JobID jobID = generateJobGraph(newConfiguration, checkpointInterval); + + final TestingMiniCluster.Builder clusterBuilder = + TestingMiniCluster.newBuilder(clusterConfiguration) + .setHighAvailabilityServicesSupplier(() -> haServices) + .setDispatcherResourceManagerComponentFactorySupplier( + createJobModeDispatcherResourceManagerComponentFactorySupplier( + newConfiguration)); + + try (final MiniCluster cluster = clusterBuilder.build()) { + // start mini cluster and submit the job + cluster.start(); + + // wait until job is running + awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline); + CommonTestUtils.waitForAllTaskRunning(cluster, jobID, false); + CommonTestUtils.waitUntilCondition( + () -> queryCompletedCheckpoints(cluster, jobID) > 0L, + Deadline.fromNow(Duration.ofSeconds(30)), + checkpointInterval / 2); + + final CompletableFuture<JobResult> firstJobResult = cluster.requestJobResult(jobID); + haServices.revokeDispatcherLeadership(); + // make sure the leadership is revoked to avoid race conditions + Assertions.assertEquals( + ApplicationStatus.UNKNOWN, firstJobResult.get().getApplicationStatus()); + + haServices.grantDispatcherLeadership(); + + // job is suspended, wait until it's running + awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline); + + assertNotNull( + cluster.getArchivedExecutionGraph(jobID) + .get() + .getCheckpointStatsSnapshot() + .getLatestRestoredCheckpoint()); + + cluster.cancelJob(jobID); + + // the cluster should shut down automatically once the application completes + CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), deadline); + } + } + + private JobID generateJobGraph(Configuration configuration, long checkpointInterval) Review comment: I really like this! Maybe a small suggestion on the naming to make the fact that we write the JG to the disk more explicit ```suggestion private JobID generateAndPersistJobGraph(Configuration configuration, long checkpointInterval) ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; +import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; +import org.apache.flink.runtime.rest.JobRestEndpointFactory; +import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import static java.nio.file.StandardOpenOption.CREATE; +import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; +import static org.junit.Assert.assertNotNull; + +/** An integration test which recovers from checkpoint after regaining the leadership. */ +public class JobDispatcherITCase extends TestLogger { Review comment: nit: There was an agreement by the community that junit5 should be used for the new tests. For this test it basically only means changing the test logger for extension, switching imports for `@Test` annotation and injecting temp directory as a parameter to the test case. ```suggestion @ExtendWith(TestLoggerExtension.class) public class JobDispatcherITCase { ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; +import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; +import org.apache.flink.runtime.rest.JobRestEndpointFactory; +import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import static java.nio.file.StandardOpenOption.CREATE; +import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; +import static org.junit.Assert.assertNotNull; + +/** An integration test which recovers from checkpoint after regaining the leadership. */ +public class JobDispatcherITCase extends TestLogger { + + private static final Duration TIMEOUT = Duration.ofMinutes(10); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private Supplier<DispatcherResourceManagerComponentFactory> + createJobModeDispatcherResourceManagerComponentFactorySupplier( + Configuration configuration) { + return () -> { + try { + return new DefaultDispatcherResourceManagerComponentFactory( + new DefaultDispatcherRunnerFactory( + JobDispatcherLeaderProcessFactoryFactory.create( + FileJobGraphRetriever.createFrom(configuration, null))), + StandaloneResourceManagerFactory.getInstance(), + JobRestEndpointFactory.INSTANCE); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + @Test + public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() throws Exception { + final Deadline deadline = Deadline.fromNow(TIMEOUT); + final Configuration configuration = new Configuration(); + configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); + final TestingMiniClusterConfiguration clusterConfiguration = + TestingMiniClusterConfiguration.newBuilder() + .setConfiguration(configuration) + .build(); + final EmbeddedHaServicesWithLeadershipControl haServices = + new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()); + + Configuration newConfiguration = new Configuration(clusterConfiguration.getConfiguration()); + long checkpointInterval = 500; + JobID jobID = generateJobGraph(newConfiguration, checkpointInterval); + + final TestingMiniCluster.Builder clusterBuilder = + TestingMiniCluster.newBuilder(clusterConfiguration) + .setHighAvailabilityServicesSupplier(() -> haServices) + .setDispatcherResourceManagerComponentFactorySupplier( + createJobModeDispatcherResourceManagerComponentFactorySupplier( + newConfiguration)); + + try (final MiniCluster cluster = clusterBuilder.build()) { + // start mini cluster and submit the job + cluster.start(); + + // wait until job is running + awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline); + CommonTestUtils.waitForAllTaskRunning(cluster, jobID, false); + CommonTestUtils.waitUntilCondition( + () -> queryCompletedCheckpoints(cluster, jobID) > 0L, + Deadline.fromNow(Duration.ofSeconds(30)), + checkpointInterval / 2); + + final CompletableFuture<JobResult> firstJobResult = cluster.requestJobResult(jobID); + haServices.revokeDispatcherLeadership(); + // make sure the leadership is revoked to avoid race conditions + Assertions.assertEquals( + ApplicationStatus.UNKNOWN, firstJobResult.get().getApplicationStatus()); + + haServices.grantDispatcherLeadership(); + + // job is suspended, wait until it's running + awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline); + + assertNotNull( + cluster.getArchivedExecutionGraph(jobID) + .get() + .getCheckpointStatsSnapshot() + .getLatestRestoredCheckpoint()); + + cluster.cancelJob(jobID); + + // the cluster should shut down automatically once the application completes + CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), deadline); Review comment: Do we need to test for this (it feels that we've already tested everything we needed to)? If not we can remove it to speed the test up a little bit. ```suggestion ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; +import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; +import org.apache.flink.runtime.rest.JobRestEndpointFactory; +import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import static java.nio.file.StandardOpenOption.CREATE; +import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; +import static org.junit.Assert.assertNotNull; + +/** An integration test which recovers from checkpoint after regaining the leadership. */ +public class JobDispatcherITCase extends TestLogger { + + private static final Duration TIMEOUT = Duration.ofMinutes(10); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private Supplier<DispatcherResourceManagerComponentFactory> + createJobModeDispatcherResourceManagerComponentFactorySupplier( + Configuration configuration) { + return () -> { + try { + return new DefaultDispatcherResourceManagerComponentFactory( + new DefaultDispatcherRunnerFactory( + JobDispatcherLeaderProcessFactoryFactory.create( + FileJobGraphRetriever.createFrom(configuration, null))), + StandaloneResourceManagerFactory.getInstance(), + JobRestEndpointFactory.INSTANCE); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + @Test + public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() throws Exception { + final Deadline deadline = Deadline.fromNow(TIMEOUT); + final Configuration configuration = new Configuration(); + configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); + final TestingMiniClusterConfiguration clusterConfiguration = + TestingMiniClusterConfiguration.newBuilder() + .setConfiguration(configuration) + .build(); + final EmbeddedHaServicesWithLeadershipControl haServices = + new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()); + + Configuration newConfiguration = new Configuration(clusterConfiguration.getConfiguration()); + long checkpointInterval = 500; + JobID jobID = generateJobGraph(newConfiguration, checkpointInterval); + + final TestingMiniCluster.Builder clusterBuilder = + TestingMiniCluster.newBuilder(clusterConfiguration) + .setHighAvailabilityServicesSupplier(() -> haServices) + .setDispatcherResourceManagerComponentFactorySupplier( + createJobModeDispatcherResourceManagerComponentFactorySupplier( + newConfiguration)); + Review comment: (followup for the comment below) ```suggestion AtLeastOneCheckpointInvokable.reset() ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java ########## @@ -177,6 +177,34 @@ public void testTerminationAfterJobCompletion() throws Exception { } } + /** + * Tests that in detached mode, the {@link MiniDispatcher} will not complete the future that + * signals job termination if the JobStatus is not globally terminal state. + */ + @Test + public void testNotTerminationWithoutGloballyTerminalState() throws Exception { Review comment: 👍 ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; +import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; +import org.apache.flink.runtime.rest.JobRestEndpointFactory; +import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import static java.nio.file.StandardOpenOption.CREATE; +import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; +import static org.junit.Assert.assertNotNull; + +/** An integration test which recovers from checkpoint after regaining the leadership. */ +public class JobDispatcherITCase extends TestLogger { + + private static final Duration TIMEOUT = Duration.ofMinutes(10); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private Supplier<DispatcherResourceManagerComponentFactory> + createJobModeDispatcherResourceManagerComponentFactorySupplier( + Configuration configuration) { + return () -> { + try { + return new DefaultDispatcherResourceManagerComponentFactory( + new DefaultDispatcherRunnerFactory( + JobDispatcherLeaderProcessFactoryFactory.create( + FileJobGraphRetriever.createFrom(configuration, null))), + StandaloneResourceManagerFactory.getInstance(), + JobRestEndpointFactory.INSTANCE); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + @Test + public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() throws Exception { + final Deadline deadline = Deadline.fromNow(TIMEOUT); + final Configuration configuration = new Configuration(); + configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); + final TestingMiniClusterConfiguration clusterConfiguration = + TestingMiniClusterConfiguration.newBuilder() + .setConfiguration(configuration) + .build(); + final EmbeddedHaServicesWithLeadershipControl haServices = + new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()); + + Configuration newConfiguration = new Configuration(clusterConfiguration.getConfiguration()); + long checkpointInterval = 500; + JobID jobID = generateJobGraph(newConfiguration, checkpointInterval); + + final TestingMiniCluster.Builder clusterBuilder = + TestingMiniCluster.newBuilder(clusterConfiguration) + .setHighAvailabilityServicesSupplier(() -> haServices) + .setDispatcherResourceManagerComponentFactorySupplier( + createJobModeDispatcherResourceManagerComponentFactorySupplier( + newConfiguration)); + + try (final MiniCluster cluster = clusterBuilder.build()) { + // start mini cluster and submit the job + cluster.start(); + + // wait until job is running + awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline); + CommonTestUtils.waitForAllTaskRunning(cluster, jobID, false); + CommonTestUtils.waitUntilCondition( + () -> queryCompletedCheckpoints(cluster, jobID) > 0L, + Deadline.fromNow(Duration.ofSeconds(30)), + checkpointInterval / 2); Review comment: ```suggestion AtLeastOneCheckpointInvokable.AT_LEAST_ONE_CHECKPOINT_COMPLETED.await(); ``` I think we can simplify this a bit and get rid of few busy waiting loops by introducing a custom invokable, that waits for at least on checkpoint to complete. ``` /** An invokable that supports checkpointing and counts down when there is at least one checkpoint. */ public static class AtLeastOneCheckpointInvokable extends AdaptiveSchedulerClusterITCase.CheckpointingNoOpInvokable { private static volatile CountDownLatch AT_LEAST_ONE_CHECKPOINT_COMPLETED; private static void reset() { AT_LEAST_ONE_CHECKPOINT_COMPLETED = new CountDownLatch(1); } public AtLeastOneCheckpointInvokable(Environment environment) { super(environment); } @Override public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) { AT_LEAST_ONE_CHECKPOINT_COMPLETED.countDown(); return CompletableFuture.completedFuture(null); } } ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; +import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; +import org.apache.flink.runtime.rest.JobRestEndpointFactory; +import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import static java.nio.file.StandardOpenOption.CREATE; +import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; +import static org.junit.Assert.assertNotNull; + +/** An integration test which recovers from checkpoint after regaining the leadership. */ +public class JobDispatcherITCase extends TestLogger { + + private static final Duration TIMEOUT = Duration.ofMinutes(10); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private Supplier<DispatcherResourceManagerComponentFactory> + createJobModeDispatcherResourceManagerComponentFactorySupplier( + Configuration configuration) { + return () -> { + try { + return new DefaultDispatcherResourceManagerComponentFactory( + new DefaultDispatcherRunnerFactory( + JobDispatcherLeaderProcessFactoryFactory.create( + FileJobGraphRetriever.createFrom(configuration, null))), + StandaloneResourceManagerFactory.getInstance(), + JobRestEndpointFactory.INSTANCE); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + @Test + public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() throws Exception { Review comment: nit: follow up for the junit 5 comment ```suggestion public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership(@TempDir Path tmp) throws Exception { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
