tillrohrmann commented on a change in pull request #16535:
URL: https://github.com/apache/flink/pull/16535#discussion_r675614877
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -0,0 +1,205 @@
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TimeUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class DispatcherFailoverITCase extends AbstractDispatcherTest {
+
+ private static final Time TIMEOUT = Time.seconds(1);
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ final CompletedCheckpointStore completedCheckpointStore =
+ new EmbeddedCompletedCheckpointStore();
+ haServices.setCheckpointRecoveryFactory(
+ PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(
+ completedCheckpointStore, new
StandaloneCheckpointIDCounter()));
+ }
+
+ @Test
+ public void
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
+ throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ // Construct job graph store.
+ final Error jobGraphRemovalError = new Error("Unable to remove job
graph.");
+ final TestingJobGraphStore jobGraphStore =
+ TestingJobGraphStore.newBuilder()
+ .setRemoveJobGraphConsumer(
+ graph -> {
+ throw jobGraphRemovalError;
+ })
+ .build();
+ jobGraphStore.start(null);
+ haServices.setJobGraphStore(jobGraphStore);
+
+ // Construct leader election service.
+ final TestingLeaderElectionService leaderElectionService =
+ new TestingLeaderElectionService();
+ haServices.setJobMasterLeaderElectionService(jobId,
leaderElectionService);
+
+ // Start the first dispatcher and submit the job.
+ final CountDownLatch jobGraphRemovalErrorReceived = new
CountDownLatch(1);
+ final Dispatcher dispatcher =
+ createRecoveredDispatcher(
+ throwable -> {
+ final Optional<Error> maybeError =
+ ExceptionUtils.findThrowable(throwable,
Error.class);
+ if (maybeError.isPresent()
+ &&
jobGraphRemovalError.equals(maybeError.get())) {
+ jobGraphRemovalErrorReceived.countDown();
+ } else {
+ testingFatalErrorHandlerResource
+ .getFatalErrorHandler()
+ .onFatalError(throwable);
+ }
+ });
+ leaderElectionService.isLeader(UUID.randomUUID());
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+
+ // Run vertices, checkpoint and finish.
+ final JobMasterGateway jobMasterGateway =
dispatcher.getJobMasterGateway(jobId).get();
+ try (final JobMasterGatewayTester tester =
+ new JobMasterGatewayTester(rpcService, jobId,
jobMasterGateway)) {
+ final List<TaskDeploymentDescriptor> descriptors =
tester.deployVertices(2).get();
+ tester.transitionTo(descriptors,
ExecutionState.INITIALIZING).get();
+ tester.transitionTo(descriptors, ExecutionState.RUNNING).get();
+ tester.awaitCheckpoint(1L).get();
+ tester.transitionTo(descriptors, ExecutionState.FINISHED).get();
+ }
+ awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED);
+ assertTrue(jobGraphRemovalErrorReceived.await(5, TimeUnit.SECONDS));
+
+ // First dispatcher is in a weird state (in a real world scenario,
we'd just kill the whole
+ // process), just remove it's leadership for now, so no extra cleanup
is performed
+ leaderElectionService.stop();
+
+ // Run a second dispatcher, that restores our finished job.
+ final Dispatcher secondDispatcher = createRecoveredDispatcher(null);
+ final DispatcherGateway secondDispatcherGateway =
+ secondDispatcher.getSelfGateway(DispatcherGateway.class);
+ leaderElectionService.isLeader(UUID.randomUUID());
+ awaitStatus(secondDispatcherGateway, jobId, JobStatus.RUNNING);
+
+ // Now make sure that restored job started from checkpoint.
+ final JobMasterGateway secondJobMasterGateway =
+ secondDispatcher.getJobMasterGateway(jobId).get();
Review comment:
This call is again problematic because of lacking synchronization. The
thing is that the `RpcEndpoints` are not intended to be accessed from any other
thread than its main thread.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -0,0 +1,205 @@
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TimeUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class DispatcherFailoverITCase extends AbstractDispatcherTest {
+
+ private static final Time TIMEOUT = Time.seconds(1);
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ final CompletedCheckpointStore completedCheckpointStore =
+ new EmbeddedCompletedCheckpointStore();
+ haServices.setCheckpointRecoveryFactory(
+ PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(
+ completedCheckpointStore, new
StandaloneCheckpointIDCounter()));
+ }
+
+ @Test
+ public void
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
+ throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ // Construct job graph store.
+ final Error jobGraphRemovalError = new Error("Unable to remove job
graph.");
+ final TestingJobGraphStore jobGraphStore =
+ TestingJobGraphStore.newBuilder()
+ .setRemoveJobGraphConsumer(
+ graph -> {
+ throw jobGraphRemovalError;
+ })
+ .build();
+ jobGraphStore.start(null);
+ haServices.setJobGraphStore(jobGraphStore);
+
+ // Construct leader election service.
+ final TestingLeaderElectionService leaderElectionService =
+ new TestingLeaderElectionService();
+ haServices.setJobMasterLeaderElectionService(jobId,
leaderElectionService);
+
+ // Start the first dispatcher and submit the job.
+ final CountDownLatch jobGraphRemovalErrorReceived = new
CountDownLatch(1);
+ final Dispatcher dispatcher =
+ createRecoveredDispatcher(
+ throwable -> {
+ final Optional<Error> maybeError =
+ ExceptionUtils.findThrowable(throwable,
Error.class);
+ if (maybeError.isPresent()
+ &&
jobGraphRemovalError.equals(maybeError.get())) {
+ jobGraphRemovalErrorReceived.countDown();
+ } else {
+ testingFatalErrorHandlerResource
+ .getFatalErrorHandler()
+ .onFatalError(throwable);
+ }
+ });
+ leaderElectionService.isLeader(UUID.randomUUID());
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+
+ // Run vertices, checkpoint and finish.
+ final JobMasterGateway jobMasterGateway =
dispatcher.getJobMasterGateway(jobId).get();
+ try (final JobMasterGatewayTester tester =
+ new JobMasterGatewayTester(rpcService, jobId,
jobMasterGateway)) {
+ final List<TaskDeploymentDescriptor> descriptors =
tester.deployVertices(2).get();
+ tester.transitionTo(descriptors,
ExecutionState.INITIALIZING).get();
+ tester.transitionTo(descriptors, ExecutionState.RUNNING).get();
+ tester.awaitCheckpoint(1L).get();
+ tester.transitionTo(descriptors, ExecutionState.FINISHED).get();
+ }
+ awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED);
+ assertTrue(jobGraphRemovalErrorReceived.await(5, TimeUnit.SECONDS));
+
+ // First dispatcher is in a weird state (in a real world scenario,
we'd just kill the whole
+ // process), just remove it's leadership for now, so no extra cleanup
is performed
+ leaderElectionService.stop();
+
+ // Run a second dispatcher, that restores our finished job.
+ final Dispatcher secondDispatcher = createRecoveredDispatcher(null);
+ final DispatcherGateway secondDispatcherGateway =
+ secondDispatcher.getSelfGateway(DispatcherGateway.class);
+ leaderElectionService.isLeader(UUID.randomUUID());
+ awaitStatus(secondDispatcherGateway, jobId, JobStatus.RUNNING);
+
+ // Now make sure that restored job started from checkpoint.
+ final JobMasterGateway secondJobMasterGateway =
+ secondDispatcher.getJobMasterGateway(jobId).get();
Review comment:
A safer way would be to listen to the published leader information via
the `LeaderRetrievalService` and then to use `rpcService.connect(address,
JobMasterGateway.class)`. You could use the `LeaderRetrievalUtils` for this.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterGatewayTester.java
##########
@@ -0,0 +1,262 @@
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class JobMasterGatewayTester implements Closeable {
+
+ private static final Time TIMEOUT = Time.seconds(1);
+
+ private static TaskStateSnapshot
createNonEmptyStateSnapshot(TaskInformation taskInformation) {
+ final TaskStateSnapshot checkpointStateHandles = new
TaskStateSnapshot();
+ checkpointStateHandles.putSubtaskStateByOperatorID(
+ OperatorID.fromJobVertexID(taskInformation.getJobVertexId()),
+ OperatorSubtaskState.builder()
+ .setManagedOperatorState(
+ new OperatorStreamStateHandle(
+ Collections.emptyMap(),
+ new ByteStreamStateHandle("foobar",
new byte[0])))
+ .build());
+ return checkpointStateHandles;
+ }
+
+ private static class CheckpointCompletionHandler {
+
+ private final Map<ExecutionAttemptID, CompletableFuture<Void>>
completedAttemptFutures;
+ private final CompletableFuture<Void> completedFuture;
+
+ public CheckpointCompletionHandler(List<TaskDeploymentDescriptor>
descriptors) {
+ this.completedAttemptFutures =
+ descriptors.stream()
+
.map(TaskDeploymentDescriptor::getExecutionAttemptId)
+ .collect(
+ Collectors.toMap(
+ Function.identity(),
+ ignored -> new
CompletableFuture<>()));
+ this.completedFuture =
FutureUtils.completeAll(completedAttemptFutures.values());
+ }
+
+ void completeAttempt(ExecutionAttemptID executionAttemptId) {
+ completedAttemptFutures.get(executionAttemptId).complete(null);
+ }
+
+ CompletableFuture<Void> getCompletedFuture() {
+ return completedFuture;
+ }
+ }
+
+ private final UnresolvedTaskManagerLocation taskManagerLocation =
+ new LocalUnresolvedTaskManagerLocation();
+ private final ConcurrentMap<ExecutionAttemptID, TaskDeploymentDescriptor>
descriptors =
+ new ConcurrentHashMap<>();
+
+ private final TestingRpcService rpcService;
+ private final JobID jobId;
+ private final JobMasterGateway jobMasterGateway;
+ private final TaskExecutorGateway taskExecutorGateway;
+
+ private final CompletableFuture<ExecutionGraphInfo>
executionGraphInfoFuture;
+
+ private final CompletableFuture<List<TaskDeploymentDescriptor>>
descriptorsFuture =
+ new CompletableFuture<>();
+
+ private final ConcurrentMap<Long, CheckpointCompletionHandler> checkpoints
=
+ new ConcurrentHashMap<>();
+
+ public JobMasterGatewayTester(
+ TestingRpcService rpcService, JobID jobId, JobMasterGateway
jobMasterGateway) {
+ this.rpcService = rpcService;
+ this.jobId = jobId;
+ this.jobMasterGateway = jobMasterGateway;
+ this.taskExecutorGateway = createTaskExecutorGateway();
+ executionGraphInfoFuture = jobMasterGateway.requestJob(TIMEOUT);
+ }
+
+ public CompletableFuture<Acknowledge> transitionTo(
+ List<TaskDeploymentDescriptor> descriptors, ExecutionState state) {
+ final List<CompletableFuture<Acknowledge>> futures =
+ descriptors.stream()
+ .map(TaskDeploymentDescriptor::getExecutionAttemptId)
+ .map(
+ attemptId ->
+
jobMasterGateway.updateTaskExecutionState(
+ new
TaskExecutionState(attemptId, state)))
+ .collect(Collectors.toList());
+ return FutureUtils.completeAll(futures).thenApply(ignored ->
Acknowledge.get());
+ }
+
+ public CompletableFuture<List<TaskDeploymentDescriptor>>
deployVertices(int numSlots) {
+ return jobMasterGateway
+ .registerTaskManager(
+ taskExecutorGateway.getAddress(), taskManagerLocation,
jobId, TIMEOUT)
+ .thenCompose(ignored -> offerSlots(numSlots))
+ .thenCompose(ignored -> descriptorsFuture);
+ }
+
+ public CompletableFuture<Void> awaitCheckpoint(long checkpointId) {
Review comment:
From the method name I would expect that this method blocks until the
checkpoint complete notification has been sent.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java
##########
@@ -21,62 +21,60 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
-import org.junit.ClassRule;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import java.util.concurrent.Executor;
/** Tests for the recovery of task failures. */
public class JobRecoveryITCase extends TestLogger {
- private static final int NUM_TMS = 1;
- private static final int SLOTS_PER_TM = 11;
- private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
+ private static final int PARALLELISM = 1;
- @ClassRule
- public static final MiniClusterResource MINI_CLUSTER_RESOURCE =
- new MiniClusterResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(NUM_TMS)
- .setNumberSlotsPerTaskManager(SLOTS_PER_TM)
- .build());
+ private static class MyHaServices extends
EmbeddedHaServicesWithLeadershipControl {
- @Test
- public void testTaskFailureRecovery() throws Exception {
- runTaskFailureRecoveryTest(createjobGraph(false));
+ public MyHaServices(Executor executor) {
+ super(executor);
+ }
}
@Test
- public void testTaskFailureWithSlotSharingRecovery() throws Exception {
- runTaskFailureRecoveryTest(createjobGraph(true));
- }
-
- private void runTaskFailureRecoveryTest(final JobGraph jobGraph) throws
Exception {
- final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
-
- miniCluster.submitJob(jobGraph).get();
-
- final CompletableFuture<JobResult> jobResultFuture =
- miniCluster.requestJobResult(jobGraph.getJobID());
-
- assertThat(jobResultFuture.get().isSuccess(), is(true));
+ public void testTaskFailureRecovery() throws Exception {
+ final TestingMiniClusterConfiguration configuration =
+ new TestingMiniClusterConfiguration.Builder()
+ //
.setNumberDispatcherResourceManagerComponents(2)
Review comment:
Is this a left-over from some testing?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -0,0 +1,205 @@
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TimeUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class DispatcherFailoverITCase extends AbstractDispatcherTest {
+
+ private static final Time TIMEOUT = Time.seconds(1);
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ final CompletedCheckpointStore completedCheckpointStore =
+ new EmbeddedCompletedCheckpointStore();
+ haServices.setCheckpointRecoveryFactory(
+ PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(
+ completedCheckpointStore, new
StandaloneCheckpointIDCounter()));
+ }
+
+ @Test
+ public void
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
+ throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ // Construct job graph store.
+ final Error jobGraphRemovalError = new Error("Unable to remove job
graph.");
+ final TestingJobGraphStore jobGraphStore =
+ TestingJobGraphStore.newBuilder()
+ .setRemoveJobGraphConsumer(
+ graph -> {
+ throw jobGraphRemovalError;
+ })
+ .build();
+ jobGraphStore.start(null);
+ haServices.setJobGraphStore(jobGraphStore);
+
+ // Construct leader election service.
+ final TestingLeaderElectionService leaderElectionService =
+ new TestingLeaderElectionService();
+ haServices.setJobMasterLeaderElectionService(jobId,
leaderElectionService);
+
+ // Start the first dispatcher and submit the job.
+ final CountDownLatch jobGraphRemovalErrorReceived = new
CountDownLatch(1);
+ final Dispatcher dispatcher =
+ createRecoveredDispatcher(
+ throwable -> {
+ final Optional<Error> maybeError =
+ ExceptionUtils.findThrowable(throwable,
Error.class);
+ if (maybeError.isPresent()
+ &&
jobGraphRemovalError.equals(maybeError.get())) {
+ jobGraphRemovalErrorReceived.countDown();
+ } else {
+ testingFatalErrorHandlerResource
+ .getFatalErrorHandler()
+ .onFatalError(throwable);
+ }
+ });
+ leaderElectionService.isLeader(UUID.randomUUID());
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+
+ // Run vertices, checkpoint and finish.
+ final JobMasterGateway jobMasterGateway =
dispatcher.getJobMasterGateway(jobId).get();
Review comment:
This call is potentially dangerous because we are now accessing the
`Dispatcher.runningJobs` from two threads. Since there is no memory barrier it
is not guaranteed that the testing thread sees the changes the `Dispatcher`
main thread has done.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -746,29 +747,43 @@ private void registerJobManagerRunnerTerminationFuture(
private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState
cleanupJobState) {
final JobManagerRunner job = checkNotNull(runningJobs.remove(jobId));
- return CompletableFuture.runAsync(
- () -> cleanUpJobData(jobId,
cleanupJobState.cleanupHAData), ioExecutor)
- .thenComposeAsync(ignored -> job.closeAsync(),
getMainThreadExecutor());
+ return CompletableFuture.supplyAsync(
+ () -> cleanUpJobGraph(jobId,
cleanupJobState.cleanupHAData), ioExecutor)
+ .thenCompose(
+ jobGraphRemoved -> job.closeAsync().thenApply(ignored
-> jobGraphRemoved))
+ .thenAcceptAsync(
+ jobGraphRemoved ->
+ cleanUpRemainingJobData(
+ jobId, cleanupJobState.cleanupHAData,
jobGraphRemoved),
+ ioExecutor);
}
- private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
- jobManagerMetricGroup.removeJob(jobId);
-
- boolean jobGraphRemoved = false;
+ private boolean cleanUpJobGraph(JobID jobId, boolean cleanupHA) {
if (cleanupHA) {
try {
jobGraphWriter.removeJobGraph(jobId);
-
// only clean up the HA blobs and ha service data for the
particular job
// if we could remove the job from HA storage
- jobGraphRemoved = true;
+ return true;
} catch (Exception e) {
log.warn(
"Could not properly remove job {} from submitted job
graph store.",
jobId,
e);
+ return false;
}
+ }
+ try {
+ jobGraphWriter.releaseJobGraph(jobId);
+ } catch (Exception e) {
+ log.warn("Could not properly release job {} from submitted job
graph store.", jobId, e);
+ }
+ return false;
+ }
+ private void cleanUpRemainingJobData(JobID jobId, boolean cleanupHA,
boolean jobGraphRemoved) {
+ jobManagerMetricGroup.removeJob(jobId);
+ if (cleanupHA) {
Review comment:
The else branch should be covered by `cleanUpJobGraph()`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobRecoveryITCase.java
##########
@@ -21,62 +21,60 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.minicluster.MiniCluster;
-import org.apache.flink.runtime.testutils.MiniClusterResource;
-import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
-import org.junit.ClassRule;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import java.util.concurrent.Executor;
/** Tests for the recovery of task failures. */
public class JobRecoveryITCase extends TestLogger {
- private static final int NUM_TMS = 1;
- private static final int SLOTS_PER_TM = 11;
- private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
+ private static final int PARALLELISM = 1;
- @ClassRule
- public static final MiniClusterResource MINI_CLUSTER_RESOURCE =
- new MiniClusterResource(
- new MiniClusterResourceConfiguration.Builder()
- .setNumberTaskManagers(NUM_TMS)
- .setNumberSlotsPerTaskManager(SLOTS_PER_TM)
- .build());
+ private static class MyHaServices extends
EmbeddedHaServicesWithLeadershipControl {
- @Test
- public void testTaskFailureRecovery() throws Exception {
- runTaskFailureRecoveryTest(createjobGraph(false));
+ public MyHaServices(Executor executor) {
+ super(executor);
+ }
}
@Test
- public void testTaskFailureWithSlotSharingRecovery() throws Exception {
- runTaskFailureRecoveryTest(createjobGraph(true));
- }
-
- private void runTaskFailureRecoveryTest(final JobGraph jobGraph) throws
Exception {
- final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
-
- miniCluster.submitJob(jobGraph).get();
-
- final CompletableFuture<JobResult> jobResultFuture =
- miniCluster.requestJobResult(jobGraph.getJobID());
-
- assertThat(jobResultFuture.get().isSuccess(), is(true));
+ public void testTaskFailureRecovery() throws Exception {
+ final TestingMiniClusterConfiguration configuration =
+ new TestingMiniClusterConfiguration.Builder()
+ //
.setNumberDispatcherResourceManagerComponents(2)
+ .setNumTaskManagers(1)
+ .setNumTaskManagers(1)
+ .build();
+ final HighAvailabilityServices haServices =
+ new MyHaServices(TestingUtils.defaultExecutor());
+
+ try (TestingMiniCluster miniCluster =
+ new TestingMiniCluster(configuration, () -> haServices)) {
+ miniCluster.start();
+ JobGraph jobGraph = createjobGraph(true);
+ miniCluster.submitJob(jobGraph).get();
+ final CompletableFuture<JobResult> jobResultFuture =
+ miniCluster.requestJobResult(jobGraph.getJobID());
+ JobResult jobResult = jobResultFuture.get();
+ System.out.println(jobResult.getApplicationStatus());
+ }
Review comment:
It looks as if we should revert this change.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterGatewayTester.java
##########
@@ -0,0 +1,262 @@
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class JobMasterGatewayTester implements Closeable {
Review comment:
I like this tester. Maybe we should name it `JobMasterTest` since it
seems to test the `JobMaster` implementation.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -0,0 +1,205 @@
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TimeUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class DispatcherFailoverITCase extends AbstractDispatcherTest {
+
+ private static final Time TIMEOUT = Time.seconds(1);
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ final CompletedCheckpointStore completedCheckpointStore =
+ new EmbeddedCompletedCheckpointStore();
+ haServices.setCheckpointRecoveryFactory(
+ PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(
+ completedCheckpointStore, new
StandaloneCheckpointIDCounter()));
+ }
+
+ @Test
+ public void
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
+ throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ // Construct job graph store.
+ final Error jobGraphRemovalError = new Error("Unable to remove job
graph.");
+ final TestingJobGraphStore jobGraphStore =
+ TestingJobGraphStore.newBuilder()
+ .setRemoveJobGraphConsumer(
+ graph -> {
+ throw jobGraphRemovalError;
+ })
+ .build();
+ jobGraphStore.start(null);
+ haServices.setJobGraphStore(jobGraphStore);
+
+ // Construct leader election service.
+ final TestingLeaderElectionService leaderElectionService =
+ new TestingLeaderElectionService();
+ haServices.setJobMasterLeaderElectionService(jobId,
leaderElectionService);
+
+ // Start the first dispatcher and submit the job.
+ final CountDownLatch jobGraphRemovalErrorReceived = new
CountDownLatch(1);
+ final Dispatcher dispatcher =
+ createRecoveredDispatcher(
+ throwable -> {
+ final Optional<Error> maybeError =
+ ExceptionUtils.findThrowable(throwable,
Error.class);
+ if (maybeError.isPresent()
+ &&
jobGraphRemovalError.equals(maybeError.get())) {
+ jobGraphRemovalErrorReceived.countDown();
+ } else {
+ testingFatalErrorHandlerResource
+ .getFatalErrorHandler()
+ .onFatalError(throwable);
+ }
+ });
+ leaderElectionService.isLeader(UUID.randomUUID());
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+
+ // Run vertices, checkpoint and finish.
+ final JobMasterGateway jobMasterGateway =
dispatcher.getJobMasterGateway(jobId).get();
+ try (final JobMasterGatewayTester tester =
+ new JobMasterGatewayTester(rpcService, jobId,
jobMasterGateway)) {
+ final List<TaskDeploymentDescriptor> descriptors =
tester.deployVertices(2).get();
+ tester.transitionTo(descriptors,
ExecutionState.INITIALIZING).get();
+ tester.transitionTo(descriptors, ExecutionState.RUNNING).get();
+ tester.awaitCheckpoint(1L).get();
+ tester.transitionTo(descriptors, ExecutionState.FINISHED).get();
+ }
+ awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED);
+ assertTrue(jobGraphRemovalErrorReceived.await(5, TimeUnit.SECONDS));
+
+ // First dispatcher is in a weird state (in a real world scenario,
we'd just kill the whole
+ // process), just remove it's leadership for now, so no extra cleanup
is performed
Review comment:
`leaderElectionService` is only responsible for the `JobMaster`
leadership for the job `jobId` and not the `Dispatcher`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -0,0 +1,205 @@
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TimeUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class DispatcherFailoverITCase extends AbstractDispatcherTest {
+
+ private static final Time TIMEOUT = Time.seconds(1);
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ final CompletedCheckpointStore completedCheckpointStore =
+ new EmbeddedCompletedCheckpointStore();
+ haServices.setCheckpointRecoveryFactory(
+ PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(
+ completedCheckpointStore, new
StandaloneCheckpointIDCounter()));
+ }
+
+ @Test
+ public void
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
+ throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ // Construct job graph store.
+ final Error jobGraphRemovalError = new Error("Unable to remove job
graph.");
+ final TestingJobGraphStore jobGraphStore =
+ TestingJobGraphStore.newBuilder()
+ .setRemoveJobGraphConsumer(
+ graph -> {
+ throw jobGraphRemovalError;
+ })
+ .build();
+ jobGraphStore.start(null);
+ haServices.setJobGraphStore(jobGraphStore);
+
+ // Construct leader election service.
+ final TestingLeaderElectionService leaderElectionService =
+ new TestingLeaderElectionService();
+ haServices.setJobMasterLeaderElectionService(jobId,
leaderElectionService);
+
+ // Start the first dispatcher and submit the job.
+ final CountDownLatch jobGraphRemovalErrorReceived = new
CountDownLatch(1);
+ final Dispatcher dispatcher =
+ createRecoveredDispatcher(
+ throwable -> {
+ final Optional<Error> maybeError =
+ ExceptionUtils.findThrowable(throwable,
Error.class);
+ if (maybeError.isPresent()
+ &&
jobGraphRemovalError.equals(maybeError.get())) {
+ jobGraphRemovalErrorReceived.countDown();
+ } else {
+ testingFatalErrorHandlerResource
+ .getFatalErrorHandler()
+ .onFatalError(throwable);
+ }
+ });
+ leaderElectionService.isLeader(UUID.randomUUID());
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+
+ // Run vertices, checkpoint and finish.
+ final JobMasterGateway jobMasterGateway =
dispatcher.getJobMasterGateway(jobId).get();
+ try (final JobMasterGatewayTester tester =
+ new JobMasterGatewayTester(rpcService, jobId,
jobMasterGateway)) {
+ final List<TaskDeploymentDescriptor> descriptors =
tester.deployVertices(2).get();
+ tester.transitionTo(descriptors,
ExecutionState.INITIALIZING).get();
+ tester.transitionTo(descriptors, ExecutionState.RUNNING).get();
+ tester.awaitCheckpoint(1L).get();
+ tester.transitionTo(descriptors, ExecutionState.FINISHED).get();
+ }
+ awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED);
+ assertTrue(jobGraphRemovalErrorReceived.await(5, TimeUnit.SECONDS));
+
+ // First dispatcher is in a weird state (in a real world scenario,
we'd just kill the whole
+ // process), just remove it's leadership for now, so no extra cleanup
is performed
+ leaderElectionService.stop();
+
+ // Run a second dispatcher, that restores our finished job.
+ final Dispatcher secondDispatcher = createRecoveredDispatcher(null);
+ final DispatcherGateway secondDispatcherGateway =
+ secondDispatcher.getSelfGateway(DispatcherGateway.class);
+ leaderElectionService.isLeader(UUID.randomUUID());
+ awaitStatus(secondDispatcherGateway, jobId, JobStatus.RUNNING);
+
+ // Now make sure that restored job started from checkpoint.
+ final JobMasterGateway secondJobMasterGateway =
+ secondDispatcher.getJobMasterGateway(jobId).get();
+ try (final JobMasterGatewayTester tester =
+ new JobMasterGatewayTester(rpcService, jobId,
secondJobMasterGateway)) {
+ final List<TaskDeploymentDescriptor> descriptors =
tester.deployVertices(2).get();
+ final Optional<JobManagerTaskRestore> maybeRestore =
+ descriptors.stream()
+ .map(TaskDeploymentDescriptor::getTaskRestore)
+ .filter(Objects::nonNull)
+ .findAny();
+ assertTrue("Job has recovered from checkpoint.",
maybeRestore.isPresent());
+ }
+
+ // Kill the first dispatcher. This should fail, but we need to cleanup
anyway, so the
+ // `rpcService` class rule succeeds
+ assertThrows(
+ ExecutionException.class, () ->
RpcUtils.terminateRpcEndpoint(dispatcher, TIMEOUT));
+
+ // Kill the second dispatcher.
+ RpcUtils.terminateRpcEndpoint(secondDispatcher, TIMEOUT);
+ }
+
+ private JobGraph createJobGraph() {
+ final JobVertex firstVertex = new JobVertex("first");
+ firstVertex.setInvokableClass(NoOpInvokable.class);
+ firstVertex.setParallelism(1);
+
+ final JobVertex secondVertex = new JobVertex("second");
+ secondVertex.setInvokableClass(NoOpInvokable.class);
+ secondVertex.setParallelism(1);
+
+ final CheckpointCoordinatorConfiguration
checkpointCoordinatorConfiguration =
+ CheckpointCoordinatorConfiguration.builder()
+ .setCheckpointInterval(100L)
Review comment:
Maybe configure `20L`. That way the test is a bit faster.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterGatewayTester.java
##########
@@ -0,0 +1,262 @@
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class JobMasterGatewayTester implements Closeable {
+
+ private static final Time TIMEOUT = Time.seconds(1);
+
+ private static TaskStateSnapshot
createNonEmptyStateSnapshot(TaskInformation taskInformation) {
+ final TaskStateSnapshot checkpointStateHandles = new
TaskStateSnapshot();
+ checkpointStateHandles.putSubtaskStateByOperatorID(
+ OperatorID.fromJobVertexID(taskInformation.getJobVertexId()),
+ OperatorSubtaskState.builder()
+ .setManagedOperatorState(
+ new OperatorStreamStateHandle(
+ Collections.emptyMap(),
+ new ByteStreamStateHandle("foobar",
new byte[0])))
+ .build());
+ return checkpointStateHandles;
+ }
+
+ private static class CheckpointCompletionHandler {
+
+ private final Map<ExecutionAttemptID, CompletableFuture<Void>>
completedAttemptFutures;
+ private final CompletableFuture<Void> completedFuture;
+
+ public CheckpointCompletionHandler(List<TaskDeploymentDescriptor>
descriptors) {
+ this.completedAttemptFutures =
+ descriptors.stream()
+
.map(TaskDeploymentDescriptor::getExecutionAttemptId)
+ .collect(
+ Collectors.toMap(
+ Function.identity(),
+ ignored -> new
CompletableFuture<>()));
+ this.completedFuture =
FutureUtils.completeAll(completedAttemptFutures.values());
+ }
+
+ void completeAttempt(ExecutionAttemptID executionAttemptId) {
+ completedAttemptFutures.get(executionAttemptId).complete(null);
+ }
+
+ CompletableFuture<Void> getCompletedFuture() {
+ return completedFuture;
+ }
+ }
+
+ private final UnresolvedTaskManagerLocation taskManagerLocation =
+ new LocalUnresolvedTaskManagerLocation();
+ private final ConcurrentMap<ExecutionAttemptID, TaskDeploymentDescriptor>
descriptors =
+ new ConcurrentHashMap<>();
+
+ private final TestingRpcService rpcService;
+ private final JobID jobId;
+ private final JobMasterGateway jobMasterGateway;
+ private final TaskExecutorGateway taskExecutorGateway;
+
+ private final CompletableFuture<ExecutionGraphInfo>
executionGraphInfoFuture;
+
+ private final CompletableFuture<List<TaskDeploymentDescriptor>>
descriptorsFuture =
+ new CompletableFuture<>();
+
+ private final ConcurrentMap<Long, CheckpointCompletionHandler> checkpoints
=
+ new ConcurrentHashMap<>();
+
+ public JobMasterGatewayTester(
+ TestingRpcService rpcService, JobID jobId, JobMasterGateway
jobMasterGateway) {
+ this.rpcService = rpcService;
+ this.jobId = jobId;
+ this.jobMasterGateway = jobMasterGateway;
+ this.taskExecutorGateway = createTaskExecutorGateway();
+ executionGraphInfoFuture = jobMasterGateway.requestJob(TIMEOUT);
+ }
+
+ public CompletableFuture<Acknowledge> transitionTo(
+ List<TaskDeploymentDescriptor> descriptors, ExecutionState state) {
+ final List<CompletableFuture<Acknowledge>> futures =
+ descriptors.stream()
+ .map(TaskDeploymentDescriptor::getExecutionAttemptId)
+ .map(
+ attemptId ->
+
jobMasterGateway.updateTaskExecutionState(
+ new
TaskExecutionState(attemptId, state)))
+ .collect(Collectors.toList());
+ return FutureUtils.completeAll(futures).thenApply(ignored ->
Acknowledge.get());
+ }
+
+ public CompletableFuture<List<TaskDeploymentDescriptor>>
deployVertices(int numSlots) {
+ return jobMasterGateway
+ .registerTaskManager(
+ taskExecutorGateway.getAddress(), taskManagerLocation,
jobId, TIMEOUT)
+ .thenCompose(ignored -> offerSlots(numSlots))
+ .thenCompose(ignored -> descriptorsFuture);
+ }
+
+ public CompletableFuture<Void> awaitCheckpoint(long checkpointId) {
+ return descriptorsFuture.thenCompose(
+ descriptors ->
+ checkpoints
+ .computeIfAbsent(
+ checkpointId,
+ key -> new
CheckpointCompletionHandler(descriptors))
+ .getCompletedFuture());
+ }
+
+ @Override
+ public void close() throws IOException {
+ rpcService.unregisterGateway(taskExecutorGateway.getAddress());
+ }
+
+ private TaskExecutorGateway createTaskExecutorGateway() {
+ final AtomicReference<TaskExecutorGateway>
taskExecutorGatewayReference =
+ new AtomicReference<>();
+ final TestingTaskExecutorGateway taskExecutorGateway =
+ new TestingTaskExecutorGatewayBuilder()
+ .setSubmitTaskConsumer(this::onSubmitTaskConsumer)
+
.setTriggerCheckpointFunction(this::onTriggerCheckpoint)
+
.setConfirmCheckpointFunction(this::onConfirmCheckpoint)
+ .createTestingTaskExecutorGateway();
+ taskExecutorGatewayReference.set(taskExecutorGateway);
Review comment:
For what do we need the `taskExecutorGatewayReference`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -381,7 +381,8 @@ private boolean isPartialResourceConfigured(JobGraph
jobGraph) {
return persistAndRunFuture.handleAsync(
(acknowledge, throwable) -> {
if (throwable != null) {
- cleanUpJobData(jobGraph.getJobID(), true);
+ final boolean jobGraphRemoved =
cleanUpJobGraph(jobGraph.getJobID(), true);
+ cleanUpRemainingJobData(jobGraph.getJobID(), true,
jobGraphRemoved);
Review comment:
Maybe we can introduce a helper method for this to give this operation a
good name.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
##########
@@ -68,6 +68,8 @@ public void testShutdownCheckpointCoordinatorOnFailure()
throws Exception {
graph.failJob(new Exception("Test Exception"),
System.currentTimeMillis());
+ scheduler.closeAsync().get();
Review comment:
I think these tests are no longer correct. The test name says that the
CC should be shut down on an `ExecutionGraph` failure. However, this is no
longer true. It's the same for the other tests.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -0,0 +1,205 @@
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TimeUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class DispatcherFailoverITCase extends AbstractDispatcherTest {
+
+ private static final Time TIMEOUT = Time.seconds(1);
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ final CompletedCheckpointStore completedCheckpointStore =
+ new EmbeddedCompletedCheckpointStore();
+ haServices.setCheckpointRecoveryFactory(
+ PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(
+ completedCheckpointStore, new
StandaloneCheckpointIDCounter()));
+ }
+
+ @Test
+ public void
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
+ throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ // Construct job graph store.
+ final Error jobGraphRemovalError = new Error("Unable to remove job
graph.");
+ final TestingJobGraphStore jobGraphStore =
+ TestingJobGraphStore.newBuilder()
+ .setRemoveJobGraphConsumer(
+ graph -> {
+ throw jobGraphRemovalError;
+ })
+ .build();
+ jobGraphStore.start(null);
+ haServices.setJobGraphStore(jobGraphStore);
+
+ // Construct leader election service.
+ final TestingLeaderElectionService leaderElectionService =
+ new TestingLeaderElectionService();
+ haServices.setJobMasterLeaderElectionService(jobId,
leaderElectionService);
+
+ // Start the first dispatcher and submit the job.
+ final CountDownLatch jobGraphRemovalErrorReceived = new
CountDownLatch(1);
+ final Dispatcher dispatcher =
+ createRecoveredDispatcher(
+ throwable -> {
+ final Optional<Error> maybeError =
+ ExceptionUtils.findThrowable(throwable,
Error.class);
+ if (maybeError.isPresent()
+ &&
jobGraphRemovalError.equals(maybeError.get())) {
+ jobGraphRemovalErrorReceived.countDown();
+ } else {
+ testingFatalErrorHandlerResource
+ .getFatalErrorHandler()
+ .onFatalError(throwable);
+ }
+ });
+ leaderElectionService.isLeader(UUID.randomUUID());
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+
+ // Run vertices, checkpoint and finish.
+ final JobMasterGateway jobMasterGateway =
dispatcher.getJobMasterGateway(jobId).get();
+ try (final JobMasterGatewayTester tester =
+ new JobMasterGatewayTester(rpcService, jobId,
jobMasterGateway)) {
+ final List<TaskDeploymentDescriptor> descriptors =
tester.deployVertices(2).get();
+ tester.transitionTo(descriptors,
ExecutionState.INITIALIZING).get();
+ tester.transitionTo(descriptors, ExecutionState.RUNNING).get();
+ tester.awaitCheckpoint(1L).get();
+ tester.transitionTo(descriptors, ExecutionState.FINISHED).get();
+ }
+ awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED);
+ assertTrue(jobGraphRemovalErrorReceived.await(5, TimeUnit.SECONDS));
+
+ // First dispatcher is in a weird state (in a real world scenario,
we'd just kill the whole
+ // process), just remove it's leadership for now, so no extra cleanup
is performed
Review comment:
If you want to simulate the lost leadership of a `Dispatcher`, then you
should close `dispatcher`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]