This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new fe5a171 [FLINK-25486][Runtime/Coordination] Fix the bug that flink
will lost state when zookeeper leader changes
fe5a171 is described below
commit fe5a1718368e62eb7ac47c00aabbd94173dae668
Author: liujiangang <[email protected]>
AuthorDate: Fri Jan 7 18:28:15 2022 +0800
[FLINK-25486][Runtime/Coordination] Fix the bug that flink will lost state
when zookeeper leader changes
This closes #18560.
---
.../flink/runtime/dispatcher/MiniDispatcher.java | 21 +-
.../runtime/dispatcher/JobDispatcherITCase.java | 248 +++++++++++++++++++++
.../runtime/dispatcher/MiniDispatcherTest.java | 29 +++
.../runtime/minicluster/TestingMiniCluster.java | 28 ++-
4 files changed, 318 insertions(+), 8 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 83f173d..99bb716 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.dispatcher;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
@@ -32,6 +33,7 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.FlinkException;
import java.util.Collections;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -102,8 +104,12 @@ public class MiniDispatcher extends Dispatcher {
? ApplicationStatus.FAILED
: ApplicationStatus.SUCCEEDED;
- log.info("Shutting down cluster because someone
retrieved the job result.");
- shutDownFuture.complete(status);
+ if
(!ApplicationStatus.UNKNOWN.equals(result.getApplicationStatus())) {
+ log.info(
+ "Shutting down cluster because someone
retrieved the job result"
+ + " and the status is globally
terminal.");
+ shutDownFuture.complete(status);
+ }
});
} else {
log.info("Not shutting down cluster after someone retrieved the
job result.");
@@ -124,16 +130,19 @@ public class MiniDispatcher extends Dispatcher {
executionGraphInfo.getArchivedExecutionGraph();
final CleanupJobState cleanupHAState =
super.jobReachedTerminalState(executionGraphInfo);
- if (jobCancelled || executionMode ==
ClusterEntrypoint.ExecutionMode.DETACHED) {
+ JobStatus jobStatus =
+ Objects.requireNonNull(
+ archivedExecutionGraph.getState(), "JobStatus should
not be null here.");
+ if (jobStatus.isGloballyTerminalState()
+ && (jobCancelled || executionMode ==
ClusterEntrypoint.ExecutionMode.DETACHED)) {
// shut down if job is cancelled or we don't have to wait for the
execution result
// retrieval
log.info(
"Shutting down cluster with state {}, jobCancelled: {},
executionMode: {}",
- archivedExecutionGraph.getState(),
+ jobStatus,
jobCancelled,
executionMode);
- shutDownFuture.complete(
-
ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
+
shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus));
}
return cleanupHAState;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
new file mode 100644
index 0000000..f34b5ee
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
+import
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import org.apache.flink.runtime.execution.Environment;
+import
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/** An integration test which recovers from checkpoint after regaining the
leadership. */
+public class JobDispatcherITCase extends TestLogger {
+
+ private static final Duration TIMEOUT = Duration.ofMinutes(10);
+
+ @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
+
+ private Supplier<DispatcherResourceManagerComponentFactory>
+ createJobModeDispatcherResourceManagerComponentFactorySupplier(
+ Configuration configuration) {
+ return () -> {
+ try {
+ return new DefaultDispatcherResourceManagerComponentFactory(
+ new DefaultDispatcherRunnerFactory(
+
JobDispatcherLeaderProcessFactoryFactory.create(
+
FileJobGraphRetriever.createFrom(configuration, null))),
+ StandaloneResourceManagerFactory.getInstance(),
+ JobRestEndpointFactory.INSTANCE);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+
+ @Test
+ public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership()
throws Exception {
+ final Deadline deadline = Deadline.fromNow(TIMEOUT);
+ final Configuration configuration = new Configuration();
+ configuration.set(HighAvailabilityOptions.HA_MODE,
HighAvailabilityMode.ZOOKEEPER.name());
+ final TestingMiniClusterConfiguration clusterConfiguration =
+ new TestingMiniClusterConfiguration.Builder()
+ .setConfiguration(configuration)
+ .build();
+ final EmbeddedHaServicesWithLeadershipControl haServices =
+ new
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
+
+ final Configuration newConfiguration =
+ new Configuration(clusterConfiguration.getConfiguration());
+ final long checkpointInterval = 100;
+ final JobID jobID =
+ generateAndPersistJobGraph(
+ newConfiguration,
+ checkpointInterval,
+ TEMPORARY_FOLDER.newFolder().toPath());
+
+ AtLeastOneCheckpointInvokable.reset();
+
+ try (final MiniCluster cluster =
+ new TestingMiniCluster(
+ clusterConfiguration,
+ () -> haServices,
+
createJobModeDispatcherResourceManagerComponentFactorySupplier(
+ newConfiguration))) {
+ // start mini cluster and submit the job
+ cluster.start();
+
+
AtLeastOneCheckpointInvokable.atLeastOneCheckpointCompleted.await();
+
+ final CompletableFuture<JobResult> firstJobResult =
cluster.requestJobResult(jobID);
+ haServices.revokeDispatcherLeadership();
+ // make sure the leadership is revoked to avoid race conditions
+ assertEquals(ApplicationStatus.UNKNOWN,
firstJobResult.get().getApplicationStatus());
+
+ haServices.grantDispatcherLeadership();
+
+ // job is suspended, wait until it's running
+ awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
+
+ assertNotNull(
+ cluster.getExecutionGraph(jobID)
+ .get()
+ .getCheckpointStatsSnapshot()
+ .getLatestRestoredCheckpoint());
+ }
+ }
+
+ private JobID generateAndPersistJobGraph(
+ Configuration configuration, long checkpointInterval, Path
tmpPath) throws Exception {
+ final JobVertex jobVertex = new JobVertex("jobVertex");
+ jobVertex.setInvokableClass(AtLeastOneCheckpointInvokable.class);
+ jobVertex.setParallelism(1);
+
+ final CheckpointCoordinatorConfiguration
checkpointCoordinatorConfiguration =
+ CheckpointCoordinatorConfiguration.builder()
+ .setCheckpointInterval(checkpointInterval)
+ .build();
+ final JobCheckpointingSettings checkpointingSettings =
+ new
JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
+ final JobGraph jobGraph =
+ JobGraphBuilder.newStreamingJobGraphBuilder()
+ .addJobVertex(jobVertex)
+ .setJobCheckpointingSettings(checkpointingSettings)
+ .build();
+
+ final Path jobGraphPath =
tmpPath.resolve(JOB_GRAPH_FILE_PATH.defaultValue());
+ try (ObjectOutputStream objectOutputStream =
+ new ObjectOutputStream(Files.newOutputStream(jobGraphPath,
CREATE))) {
+ objectOutputStream.writeObject(jobGraph);
+ }
+ configuration.setString(JOB_GRAPH_FILE_PATH.key(),
jobGraphPath.toString());
+ return jobGraph.getJobID();
+ }
+
+ private static void awaitJobStatus(
+ MiniCluster cluster, JobID jobId, JobStatus status, Deadline
deadline)
+ throws Exception {
+ CommonTestUtils.waitUntilCondition(
+ () -> {
+ try {
+ return cluster.getJobStatus(jobId).get() == status;
+ } catch (ExecutionException e) {
+ if (ExceptionUtils.findThrowable(e,
FlinkJobNotFoundException.class)
+ .isPresent()) {
+ // job may not be yet submitted
+ return false;
+ }
+ throw e;
+ }
+ },
+ deadline);
+ }
+
+ /**
+ * An invokable that supports checkpointing and counts down when there is
at least one
+ * checkpoint.
+ */
+ public static class AtLeastOneCheckpointInvokable extends
AbstractInvokable {
+
+ private static final long CANCEL_SIGNAL = -2L;
+ private final BlockingQueue<Long> checkpointsToConfirm = new
ArrayBlockingQueue<>(1);
+
+ private static volatile CountDownLatch atLeastOneCheckpointCompleted;
+
+ private static void reset() {
+ atLeastOneCheckpointCompleted = new CountDownLatch(1);
+ }
+
+ public AtLeastOneCheckpointInvokable(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ long signal = checkpointsToConfirm.take();
+ while (signal != CANCEL_SIGNAL) {
+ getEnvironment().acknowledgeCheckpoint(signal, new
CheckpointMetrics());
+ signal = checkpointsToConfirm.take();
+ }
+ }
+
+ @Override
+ public Future<Void> cancel() throws Exception {
+ checkpointsToConfirm.add(CANCEL_SIGNAL);
+ return FutureUtils.completedVoidFuture();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> triggerCheckpointAsync(
+ CheckpointMetaData checkpointMetaData, CheckpointOptions
checkpointOptions) {
+ checkpointsToConfirm.add(checkpointMetaData.getCheckpointId());
+ return CompletableFuture.completedFuture(true);
+ }
+
+ @Override
+ public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
+ atLeastOneCheckpointCompleted.countDown();
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index ec1e9c5..9420a7c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -178,6 +178,35 @@ public class MiniDispatcherTest extends TestLogger {
}
/**
+ * Tests that in detached mode, the {@link MiniDispatcher} will not
complete the future that
+ * signals job termination if the JobStatus is not globally terminal state.
+ */
+ @Test
+ public void testNotTerminationWithoutGloballyTerminalState() throws
Exception {
+ final MiniDispatcher miniDispatcher =
+ createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
+ miniDispatcher.start();
+
+ try {
+ // wait until we have submitted the job
+ final TestingJobManagerRunner testingJobManagerRunner =
+
testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+
+ testingJobManagerRunner.completeResultFuture(
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(jobGraph.getJobID())
+ .setState(JobStatus.SUSPENDED)
+ .build()));
+
+ testingJobManagerRunner.getTerminationFuture().get();
+ assertThat(miniDispatcher.getShutDownFuture().isDone(), is(false));
+ } finally {
+ RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
+ }
+ }
+
+ /**
* Tests that the {@link MiniDispatcher} only terminates in {@link
* ClusterEntrypoint.ExecutionMode#NORMAL} after it has served the {@link
* org.apache.flink.runtime.jobmaster.JobResult} once.
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
index d0767a6..165c9a3 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
@@ -48,18 +48,33 @@ public class TestingMiniCluster extends MiniCluster {
@Nullable private final Supplier<HighAvailabilityServices>
highAvailabilityServicesSupplier;
+ @Nullable
+ private final Supplier<DispatcherResourceManagerComponentFactory>
+ dispatcherResourceManagerComponentFactorySupplier;
+
public TestingMiniCluster(
TestingMiniClusterConfiguration miniClusterConfiguration,
- @Nullable Supplier<HighAvailabilityServices>
highAvailabilityServicesSupplier) {
+ @Nullable Supplier<HighAvailabilityServices>
highAvailabilityServicesSupplier,
+ @Nullable
+ Supplier<DispatcherResourceManagerComponentFactory>
+ dispatcherResourceManagerComponentFactorySupplier)
{
super(miniClusterConfiguration);
this.numberDispatcherResourceManagerComponents =
miniClusterConfiguration.getNumberDispatcherResourceManagerComponents();
this.highAvailabilityServicesSupplier =
highAvailabilityServicesSupplier;
+ this.dispatcherResourceManagerComponentFactorySupplier =
+ dispatcherResourceManagerComponentFactorySupplier;
this.localCommunication =
miniClusterConfiguration.isLocalCommunication();
}
+ public TestingMiniCluster(
+ TestingMiniClusterConfiguration miniClusterConfiguration,
+ @Nullable Supplier<HighAvailabilityServices>
highAvailabilityServicesSupplier) {
+ this(miniClusterConfiguration, highAvailabilityServicesSupplier, null);
+ }
+
public TestingMiniCluster(TestingMiniClusterConfiguration
miniClusterConfiguration) {
- this(miniClusterConfiguration, null);
+ this(miniClusterConfiguration, null, null);
}
@Override
@@ -78,6 +93,15 @@ public class TestingMiniCluster extends MiniCluster {
}
@Override
+ DispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory() {
+ if (dispatcherResourceManagerComponentFactorySupplier != null) {
+ return dispatcherResourceManagerComponentFactorySupplier.get();
+ } else {
+ return super.createDispatcherResourceManagerComponentFactory();
+ }
+ }
+
+ @Override
protected Collection<? extends DispatcherResourceManagerComponent>
createDispatcherResourceManagerComponents(
Configuration configuration,