tillrohrmann commented on a change in pull request #16535:
URL: https://github.com/apache/flink/pull/16535#discussion_r682701270
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -743,31 +744,43 @@ private void registerJobManagerRunnerTerminationFuture(
private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState
cleanupJobState) {
final JobManagerRunner job = checkNotNull(runningJobs.remove(jobId));
-
- final CompletableFuture<Void> jobTerminationFuture = job.closeAsync();
-
- return jobTerminationFuture.thenRunAsync(
- () -> cleanUpJobData(jobId, cleanupJobState.cleanupHAData),
ioExecutor);
+ return CompletableFuture.supplyAsync(
+ () -> cleanUpJobGraph(jobId,
cleanupJobState.cleanupHAData), ioExecutor)
+ .thenCompose(
+ jobGraphRemoved -> job.closeAsync().thenApply(ignored
-> jobGraphRemoved))
+ .thenAcceptAsync(
+ jobGraphRemoved ->
+ cleanUpRemainingJobData(
+ jobId, cleanupJobState.cleanupHAData,
jobGraphRemoved),
+ ioExecutor);
}
- private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
- jobManagerMetricGroup.removeJob(jobId);
-
- boolean jobGraphRemoved = false;
+ private boolean cleanUpJobGraph(JobID jobId, boolean cleanupHA) {
if (cleanupHA) {
try {
jobGraphWriter.removeJobGraph(jobId);
-
// only clean up the HA blobs and ha service data for the
particular job
// if we could remove the job from HA storage
- jobGraphRemoved = true;
+ return true;
} catch (Exception e) {
log.warn(
"Could not properly remove job {} from submitted job
graph store.",
jobId,
e);
+ return false;
}
+ }
+ try {
+ jobGraphWriter.releaseJobGraph(jobId);
+ } catch (Exception e) {
+ log.warn("Could not properly release job {} from submitted job
graph store.", jobId, e);
+ }
+ return false;
+ }
+ private void cleanUpRemainingJobData(JobID jobId, boolean cleanupHA,
boolean jobGraphRemoved) {
Review comment:
I am wondering whether we can replace `cleanupHA` with `jobGraphRemoved`
here. I know that this is not the same as before because it would mean that we
keep the `runningJobsRegistry` entry if we cannot remove the job graph, but it
seems a lot easier to understand this function like this.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -219,11 +156,19 @@ private TestingDispatcher createAndStartDispatcher(
TestingHighAvailabilityServices haServices,
JobManagerRunnerFactory jobManagerRunnerFactory)
throws Exception {
+ JobGraphWriter jobGraphWriter;
+ try {
+ jobGraphWriter = haServices.getJobGraphStore();
+ } catch (IllegalStateException e) {
+ jobGraphWriter = NoOpJobGraphWriter.INSTANCE;
+ }
Review comment:
Instead of doing this, would it make sense to initialize the
`haServices` field with a `StandaloneJobGraphStore`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -743,31 +744,43 @@ private void registerJobManagerRunnerTerminationFuture(
private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState
cleanupJobState) {
final JobManagerRunner job = checkNotNull(runningJobs.remove(jobId));
-
- final CompletableFuture<Void> jobTerminationFuture = job.closeAsync();
-
- return jobTerminationFuture.thenRunAsync(
- () -> cleanUpJobData(jobId, cleanupJobState.cleanupHAData),
ioExecutor);
+ return CompletableFuture.supplyAsync(
+ () -> cleanUpJobGraph(jobId,
cleanupJobState.cleanupHAData), ioExecutor)
+ .thenCompose(
+ jobGraphRemoved -> job.closeAsync().thenApply(ignored
-> jobGraphRemoved))
+ .thenAcceptAsync(
+ jobGraphRemoved ->
+ cleanUpRemainingJobData(
+ jobId, cleanupJobState.cleanupHAData,
jobGraphRemoved),
+ ioExecutor);
}
- private void cleanUpJobData(JobID jobId, boolean cleanupHA) {
- jobManagerMetricGroup.removeJob(jobId);
-
- boolean jobGraphRemoved = false;
+ private boolean cleanUpJobGraph(JobID jobId, boolean cleanupHA) {
if (cleanupHA) {
try {
jobGraphWriter.removeJobGraph(jobId);
-
// only clean up the HA blobs and ha service data for the
particular job
// if we could remove the job from HA storage
Review comment:
Maybe remove this comment and add a JavaDoc to the method stating the
purpose of the return value.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -232,17 +177,10 @@ private TestingDispatcher createAndStartDispatcher(
@After
public void tearDown() throws Exception {
+ super.tearDown();
Review comment:
I think this should go after the if block because the dispatcher might
still use some of the services terminated in `super.tearDown()`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -850,6 +813,73 @@ public void
testInitializationTimestampForwardedToJobManagerRunner() throws Exce
assertThat(initializationTimestamp, greaterThan(0L));
}
+ @Test
+ public void testJobDataAreCleanedUpInCorrectOrderOnFinishedJob() throws
Exception {
+ testJobDataAreCleanedUpInCorrectOrder(JobStatus.FINISHED);
+ }
+
+ @Test
+ public void testJobDataAreCleanedUpInCorrectOrderOnFailedJob() throws
Exception {
+ testJobDataAreCleanedUpInCorrectOrder(JobStatus.FAILED);
+ }
+
+ private void testJobDataAreCleanedUpInCorrectOrder(JobStatus jobStatus)
throws Exception {
+ final BlockingQueue<String> cleanUpEvents = new
LinkedBlockingQueue<>();
+
+ // Track cleanup - ha-services
+ final CompletableFuture<JobID> cleanupJobData = new
CompletableFuture<>();
+ haServices.setCleanupJobDataFuture(cleanupJobData);
+ cleanupJobData.thenAccept(jobId -> cleanUpEvents.add("ha-services"));
+
+ // Track cleanup - job-graph
+ final TestingJobGraphStore jobGraphStore =
+ TestingJobGraphStore.newBuilder()
+ .setReleaseJobGraphConsumer(jobId ->
cleanUpEvents.add("job-graph-release"))
+ .setRemoveJobGraphConsumer(jobId ->
cleanUpEvents.add("job-graph-remove"))
+ .build();
+ jobGraphStore.start(null);
+ haServices.setJobGraphStore(jobGraphStore);
+
+ // Track cleanup - running jobs registry
+ haServices.setRunningJobsRegistry(
+ new StandaloneRunningJobsRegistry() {
+
+ @Override
+ public void clearJob(JobID jobID) {
+ super.clearJob(jobID);
+ cleanUpEvents.add("running-jobs-registry");
+ }
+ });
+
+ final CompletableFuture<JobManagerRunnerResult> resultFuture = new
CompletableFuture<>();
+ dispatcher =
+ createAndStartDispatcher(
+ heartbeatServices,
+ haServices,
+ new FinishingJobManagerRunnerFactory(
+ resultFuture, () ->
cleanUpEvents.add("job-manager-runner")));
+
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ resultFuture.complete(
+ JobManagerRunnerResult.forSuccess(
+ new ExecutionGraphInfo(
+ new
ArchivedExecutionGraphBuilder().setState(jobStatus).build())));
+
+ // Wait for job to terminate.
+ dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
+
+ assertThat(
+ new ArrayList<>(cleanUpEvents),
+ equalTo(
+ Arrays.asList(
+ "job-graph-remove",
+ "job-manager-runner",
+ "running-jobs-registry",
+ "ha-services")));
Review comment:
Same here with the constants.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+/** An integration test for various fail-over scenarios of the {@link
Dispatcher} component. */
+public class DispatcherFailoverITCase extends AbstractDispatcherTest {
+
+ private static final Time TIMEOUT = Time.seconds(1);
+
+ private final BlockingQueue<RpcEndpoint> toTerminate = new
LinkedBlockingQueue<>();
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ final CompletedCheckpointStore completedCheckpointStore =
+ new EmbeddedCompletedCheckpointStore();
+ haServices.setCheckpointRecoveryFactory(
+ PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(
+ completedCheckpointStore, new
StandaloneCheckpointIDCounter()));
+ }
+
+ @After
+ public void tearDown() {
+ while (!toTerminate.isEmpty()) {
+ final RpcEndpoint endpoint = toTerminate.poll();
+ try {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ } catch (Exception e) {
+ // Ignore.
+ }
+ }
+ }
+
+ @Test
+ public void
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
+ throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ // Construct job graph store.
+ final Error jobGraphRemovalError = new Error("Unable to remove job
graph.");
+ final TestingJobGraphStore jobGraphStore =
+ TestingJobGraphStore.newBuilder()
+ .setRemoveJobGraphConsumer(
+ graph -> {
+ throw jobGraphRemovalError;
+ })
+ .build();
+ jobGraphStore.start(null);
+ haServices.setJobGraphStore(jobGraphStore);
+
+ // Construct leader election service.
+ final TestingLeaderElectionService leaderElectionService =
+ new TestingLeaderElectionService();
+ haServices.setJobMasterLeaderElectionService(jobId,
leaderElectionService);
+
+ // Start the first dispatcher and submit the job.
+ final CountDownLatch jobGraphRemovalErrorReceived = new
CountDownLatch(1);
+ final Dispatcher dispatcher =
+ createRecoveredDispatcher(
+ throwable -> {
+ final Optional<Error> maybeError =
+ ExceptionUtils.findThrowable(throwable,
Error.class);
+ if (maybeError.isPresent()
+ &&
jobGraphRemovalError.equals(maybeError.get())) {
+ jobGraphRemovalErrorReceived.countDown();
+ } else {
+ testingFatalErrorHandlerResource
+ .getFatalErrorHandler()
+ .onFatalError(throwable);
+ }
+ });
+ toTerminate.add(dispatcher);
+ leaderElectionService.isLeader(UUID.randomUUID());
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+
+ // Run vertices, checkpoint and finish.
+ final JobMasterGateway jobMasterGateway =
+ connectToLeadingJobMaster(leaderElectionService).get();
+ try (final JobMasterTester tester =
+ new JobMasterTester(rpcService, jobId, jobMasterGateway)) {
+ final List<TaskDeploymentDescriptor> descriptors =
tester.deployVertices(2).get();
+ tester.transitionTo(descriptors,
ExecutionState.INITIALIZING).get();
+ tester.transitionTo(descriptors, ExecutionState.RUNNING).get();
+ tester.getCheckpointFuture(1L).get();
+ tester.transitionTo(descriptors, ExecutionState.FINISHED).get();
+ }
+ awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED);
+ assertTrue(jobGraphRemovalErrorReceived.await(5, TimeUnit.SECONDS));
Review comment:
```suggestion
jobGraphRemovalErrorReceived.await();
```
is a bit more robust on our CI infrastructure.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * A testing utility, that simulates the desired interactions with {@link
JobMasterGateway} RPC.
+ * This is useful for light-weight e2e tests, eg. simulating specific
fail-over scenario.
+ */
+public class JobMasterTester implements Closeable {
+
+ private static final Time TIMEOUT = Time.seconds(1);
Review comment:
I'm wondering whether this timeout is too low for our CI infrastructure.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -652,29 +587,62 @@ public void testFailingJobManagerRunnerCleanup() throws
Exception {
final FlinkException testException = new FlinkException("Test
exception.");
final ArrayBlockingQueue<Optional<Exception>> queue = new
ArrayBlockingQueue<>(2);
- BlockingJobManagerRunnerFactory blockingJobManagerRunnerFactory =
+ final BlockingJobManagerRunnerFactory blockingJobManagerRunnerFactory =
new BlockingJobManagerRunnerFactory(
() -> {
- final Optional<Exception> take = queue.take();
- final Exception exception = take.orElse(null);
-
- if (exception != null) {
- throw exception;
+ final Optional<Exception> maybeException =
queue.take();
+ if (maybeException.isPresent()) {
+ throw maybeException.get();
}
});
+
+ final BlockingQueue<String> cleanUpEvents = new
LinkedBlockingQueue<>();
+
+ // Track cleanup - ha-services
+ final CompletableFuture<JobID> cleanupJobData = new
CompletableFuture<>();
+ haServices.setCleanupJobDataFuture(cleanupJobData);
+ cleanupJobData.thenAccept(jobId -> cleanUpEvents.add("ha-services"));
+
+ // Track cleanup - job-graph
+ final TestingJobGraphStore jobGraphStore =
+ TestingJobGraphStore.newBuilder()
+ .setReleaseJobGraphConsumer(jobId ->
cleanUpEvents.add("job-graph-release"))
+ .setRemoveJobGraphConsumer(jobId ->
cleanUpEvents.add("job-graph-remove"))
+ .build();
+ jobGraphStore.start(null);
+ haServices.setJobGraphStore(jobGraphStore);
+
+ // Track cleanup - running jobs registry
+ haServices.setRunningJobsRegistry(
+ new StandaloneRunningJobsRegistry() {
+
+ @Override
+ public void clearJob(JobID jobID) {
+ super.clearJob(jobID);
+ cleanUpEvents.add("running-jobs-registry");
+ }
+ });
+
dispatcher =
createAndStartDispatcher(
heartbeatServices, haServices,
blockingJobManagerRunnerFactory);
final DispatcherGateway dispatcherGateway =
dispatcher.getSelfGateway(DispatcherGateway.class);
+ // submit and fail during job master runner construction
queue.offer(Optional.of(testException));
try {
dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
} catch (Throwable expectedException) {
assertThat(expectedException, containsCause(FlinkException.class));
assertThat(expectedException,
containsMessage(testException.getMessage()));
+ // make sure we've cleaned up in correct order (including HA)
+ assertThat(
+ new ArrayList<>(cleanUpEvents),
+ equalTo(
+ Arrays.asList(
+ "job-graph-remove",
"running-jobs-registry", "ha-services")));
Review comment:
We could introduce constants for these names. Then it is easier to
maintain them.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * A testing utility, that simulates the desired interactions with {@link
JobMasterGateway} RPC.
+ * This is useful for light-weight e2e tests, eg. simulating specific
fail-over scenario.
+ */
+public class JobMasterTester implements Closeable {
+
+ private static final Time TIMEOUT = Time.seconds(1);
Review comment:
Since this timeout is used for the rpc calls, I think it totally fine to
set it to 1 minute w/o much harm.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobGraphWriter;
+import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TimeUtils;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ForkJoinPool;
+
+/** Abstract test for the {@link Dispatcher} component. */
+public class AbstractDispatcherTest extends TestLogger {
+
+ static TestingRpcService rpcService;
+
+ static final Time TIMEOUT = Time.seconds(10L);
Review comment:
I hope that this timeout is long enough for our CI infrastructure.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.TestingJobGraphStore;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+/** An integration test for various fail-over scenarios of the {@link
Dispatcher} component. */
+public class DispatcherFailoverITCase extends AbstractDispatcherTest {
+
+ private static final Time TIMEOUT = Time.seconds(1);
+
+ private final BlockingQueue<RpcEndpoint> toTerminate = new
LinkedBlockingQueue<>();
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ final CompletedCheckpointStore completedCheckpointStore =
+ new EmbeddedCompletedCheckpointStore();
+ haServices.setCheckpointRecoveryFactory(
+ PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(
+ completedCheckpointStore, new
StandaloneCheckpointIDCounter()));
+ }
+
+ @After
+ public void tearDown() {
+ while (!toTerminate.isEmpty()) {
+ final RpcEndpoint endpoint = toTerminate.poll();
+ try {
+ RpcUtils.terminateRpcEndpoint(endpoint, TIMEOUT);
+ } catch (Exception e) {
+ // Ignore.
+ }
+ }
+ }
+
+ @Test
+ public void
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
+ throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ // Construct job graph store.
+ final Error jobGraphRemovalError = new Error("Unable to remove job
graph.");
+ final TestingJobGraphStore jobGraphStore =
+ TestingJobGraphStore.newBuilder()
+ .setRemoveJobGraphConsumer(
+ graph -> {
+ throw jobGraphRemovalError;
+ })
+ .build();
+ jobGraphStore.start(null);
+ haServices.setJobGraphStore(jobGraphStore);
+
+ // Construct leader election service.
+ final TestingLeaderElectionService leaderElectionService =
+ new TestingLeaderElectionService();
+ haServices.setJobMasterLeaderElectionService(jobId,
leaderElectionService);
+
+ // Start the first dispatcher and submit the job.
+ final CountDownLatch jobGraphRemovalErrorReceived = new
CountDownLatch(1);
+ final Dispatcher dispatcher =
+ createRecoveredDispatcher(
+ throwable -> {
+ final Optional<Error> maybeError =
+ ExceptionUtils.findThrowable(throwable,
Error.class);
+ if (maybeError.isPresent()
+ &&
jobGraphRemovalError.equals(maybeError.get())) {
+ jobGraphRemovalErrorReceived.countDown();
+ } else {
+ testingFatalErrorHandlerResource
+ .getFatalErrorHandler()
+ .onFatalError(throwable);
+ }
+ });
+ toTerminate.add(dispatcher);
+ leaderElectionService.isLeader(UUID.randomUUID());
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+
+ // Run vertices, checkpoint and finish.
+ final JobMasterGateway jobMasterGateway =
+ connectToLeadingJobMaster(leaderElectionService).get();
+ try (final JobMasterTester tester =
+ new JobMasterTester(rpcService, jobId, jobMasterGateway)) {
+ final List<TaskDeploymentDescriptor> descriptors =
tester.deployVertices(2).get();
+ tester.transitionTo(descriptors,
ExecutionState.INITIALIZING).get();
+ tester.transitionTo(descriptors, ExecutionState.RUNNING).get();
+ tester.getCheckpointFuture(1L).get();
+ tester.transitionTo(descriptors, ExecutionState.FINISHED).get();
+ }
+ awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED);
+ assertTrue(jobGraphRemovalErrorReceived.await(5, TimeUnit.SECONDS));
+
+ // Remove job master leadership.
+ leaderElectionService.stop();
+
+ // Run a second dispatcher, that restores our finished job.
+ final Dispatcher secondDispatcher = createRecoveredDispatcher(null);
+ toTerminate.add(secondDispatcher);
+ final DispatcherGateway secondDispatcherGateway =
+ secondDispatcher.getSelfGateway(DispatcherGateway.class);
+ UUID uuid = UUID.randomUUID();
+ leaderElectionService.isLeader(uuid);
Review comment:
I have to admit that I was a bit thrown off why this is working. Then I
realized that `leaderElectionService.stop()` clears the `contender` field. One
way to make it clearer that the old `Dispatcher` and its `JobMaster` are no
longer used is to explicitly close it after the error is received.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]