dmvk commented on a change in pull request #16535:
URL: https://github.com/apache/flink/pull/16535#discussion_r677432796
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -0,0 +1,205 @@
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TimeUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class DispatcherFailoverITCase extends AbstractDispatcherTest {
+
+ private static final Time TIMEOUT = Time.seconds(1);
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ final CompletedCheckpointStore completedCheckpointStore =
+ new EmbeddedCompletedCheckpointStore();
+ haServices.setCheckpointRecoveryFactory(
+ PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(
+ completedCheckpointStore, new
StandaloneCheckpointIDCounter()));
+ }
+
+ @Test
+ public void
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
+ throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ // Construct job graph store.
+ final Error jobGraphRemovalError = new Error("Unable to remove job
graph.");
+ final TestingJobGraphStore jobGraphStore =
+ TestingJobGraphStore.newBuilder()
+ .setRemoveJobGraphConsumer(
+ graph -> {
+ throw jobGraphRemovalError;
+ })
+ .build();
+ jobGraphStore.start(null);
+ haServices.setJobGraphStore(jobGraphStore);
+
+ // Construct leader election service.
+ final TestingLeaderElectionService leaderElectionService =
+ new TestingLeaderElectionService();
+ haServices.setJobMasterLeaderElectionService(jobId,
leaderElectionService);
+
+ // Start the first dispatcher and submit the job.
+ final CountDownLatch jobGraphRemovalErrorReceived = new
CountDownLatch(1);
+ final Dispatcher dispatcher =
+ createRecoveredDispatcher(
+ throwable -> {
+ final Optional<Error> maybeError =
+ ExceptionUtils.findThrowable(throwable,
Error.class);
+ if (maybeError.isPresent()
+ &&
jobGraphRemovalError.equals(maybeError.get())) {
+ jobGraphRemovalErrorReceived.countDown();
+ } else {
+ testingFatalErrorHandlerResource
+ .getFatalErrorHandler()
+ .onFatalError(throwable);
+ }
+ });
+ leaderElectionService.isLeader(UUID.randomUUID());
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+
+ // Run vertices, checkpoint and finish.
+ final JobMasterGateway jobMasterGateway =
dispatcher.getJobMasterGateway(jobId).get();
+ try (final JobMasterGatewayTester tester =
+ new JobMasterGatewayTester(rpcService, jobId,
jobMasterGateway)) {
+ final List<TaskDeploymentDescriptor> descriptors =
tester.deployVertices(2).get();
+ tester.transitionTo(descriptors,
ExecutionState.INITIALIZING).get();
+ tester.transitionTo(descriptors, ExecutionState.RUNNING).get();
+ tester.awaitCheckpoint(1L).get();
+ tester.transitionTo(descriptors, ExecutionState.FINISHED).get();
+ }
+ awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED);
+ assertTrue(jobGraphRemovalErrorReceived.await(5, TimeUnit.SECONDS));
+
+ // First dispatcher is in a weird state (in a real world scenario,
we'd just kill the whole
+ // process), just remove it's leadership for now, so no extra cleanup
is performed
+ leaderElectionService.stop();
+
+ // Run a second dispatcher, that restores our finished job.
+ final Dispatcher secondDispatcher = createRecoveredDispatcher(null);
+ final DispatcherGateway secondDispatcherGateway =
+ secondDispatcher.getSelfGateway(DispatcherGateway.class);
+ leaderElectionService.isLeader(UUID.randomUUID());
+ awaitStatus(secondDispatcherGateway, jobId, JobStatus.RUNNING);
+
+ // Now make sure that restored job started from checkpoint.
+ final JobMasterGateway secondJobMasterGateway =
+ secondDispatcher.getJobMasterGateway(jobId).get();
Review comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]