This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8ba13f37afb9164f3bb17de78c4b0d85b1633638 Author: liujiangang <liujiang...@kuaishou.com> AuthorDate: Fri Jan 7 18:28:15 2022 +0800 [FLINK-25486][Runtime/Coordination] Fix the bug that flink will lost state when zookeeper leader changes This closes #18296. --- .../flink/runtime/dispatcher/MiniDispatcher.java | 21 ++- .../runtime/dispatcher/JobDispatcherITCase.java | 210 +++++++++++++++++++++ .../runtime/dispatcher/MiniDispatcherTest.java | 33 ++++ 3 files changed, 258 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java index bc5776e..76afe73 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; @@ -34,6 +35,7 @@ import org.apache.flink.util.FlinkException; import javax.annotation.Nullable; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -106,8 +108,12 @@ public class MiniDispatcher extends Dispatcher { ? ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED; - log.info("Shutting down cluster because someone retrieved the job result."); - shutDownFuture.complete(status); + if (!ApplicationStatus.UNKNOWN.equals(result.getApplicationStatus())) { + log.info( + "Shutting down cluster because someone retrieved the job result" + + " and the status is globally terminal."); + shutDownFuture.complete(status); + } }); } else { log.info("Not shutting down cluster after someone retrieved the job result."); @@ -128,16 +134,19 @@ public class MiniDispatcher extends Dispatcher { executionGraphInfo.getArchivedExecutionGraph(); final CleanupJobState cleanupHAState = super.jobReachedTerminalState(executionGraphInfo); - if (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED) { + JobStatus jobStatus = + Objects.requireNonNull( + archivedExecutionGraph.getState(), "JobStatus should not be null here."); + if (jobStatus.isGloballyTerminalState() + && (jobCancelled || executionMode == ClusterEntrypoint.ExecutionMode.DETACHED)) { // shut down if job is cancelled or we don't have to wait for the execution result // retrieval log.info( "Shutting down cluster with state {}, jobCancelled: {}, executionMode: {}", - archivedExecutionGraph.getState(), + jobStatus, jobCancelled, executionMode); - shutDownFuture.complete( - ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState())); + shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus)); } return cleanupHAState; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java new file mode 100644 index 0000000..f565960 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory; +import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory; +import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory; +import org.apache.flink.runtime.rest.JobRestEndpointFactory; +import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import static java.nio.file.StandardOpenOption.CREATE; +import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; +import static org.junit.Assert.assertNotNull; + +/** An integration test which recovers from checkpoint after regaining the leadership. */ +public class JobDispatcherITCase extends TestLogger { + + private static final Duration TIMEOUT = Duration.ofMinutes(10); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private Supplier<DispatcherResourceManagerComponentFactory> + createJobModeDispatcherResourceManagerComponentFactorySupplier( + Configuration configuration) { + return () -> { + try { + return new DefaultDispatcherResourceManagerComponentFactory( + new DefaultDispatcherRunnerFactory( + JobDispatcherLeaderProcessFactoryFactory.create( + FileJobGraphRetriever.createFrom(configuration, null))), + StandaloneResourceManagerFactory.getInstance(), + JobRestEndpointFactory.INSTANCE); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + } + + @Test + public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() throws Exception { + final Deadline deadline = Deadline.fromNow(TIMEOUT); + final Configuration configuration = new Configuration(); + configuration.set(HighAvailabilityOptions.HA_MODE, HighAvailabilityMode.ZOOKEEPER.name()); + final TestingMiniClusterConfiguration clusterConfiguration = + TestingMiniClusterConfiguration.newBuilder() + .setConfiguration(configuration) + .build(); + final EmbeddedHaServicesWithLeadershipControl haServices = + new EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor()); + + Configuration newConfiguration = new Configuration(clusterConfiguration.getConfiguration()); + long checkpointInterval = 500; + JobID jobID = generateJobGraph(newConfiguration, checkpointInterval); + + final TestingMiniCluster.Builder clusterBuilder = + TestingMiniCluster.newBuilder(clusterConfiguration) + .setHighAvailabilityServicesSupplier(() -> haServices) + .setDispatcherResourceManagerComponentFactorySupplier( + createJobModeDispatcherResourceManagerComponentFactorySupplier( + newConfiguration)); + + try (final MiniCluster cluster = clusterBuilder.build()) { + // start mini cluster and submit the job + cluster.start(); + + // wait until job is running + awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline); + CommonTestUtils.waitForAllTaskRunning(cluster, jobID, false); + CommonTestUtils.waitUntilCondition( + () -> queryCompletedCheckpoints(cluster, jobID) > 0L, + Deadline.fromNow(Duration.ofSeconds(30)), + checkpointInterval / 2); + + final CompletableFuture<JobResult> firstJobResult = cluster.requestJobResult(jobID); + haServices.revokeDispatcherLeadership(); + // make sure the leadership is revoked to avoid race conditions + Assertions.assertEquals( + ApplicationStatus.UNKNOWN, firstJobResult.get().getApplicationStatus()); + + haServices.grantDispatcherLeadership(); + + // job is suspended, wait until it's running + awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline); + + assertNotNull( + cluster.getArchivedExecutionGraph(jobID) + .get() + .getCheckpointStatsSnapshot() + .getLatestRestoredCheckpoint()); + + cluster.cancelJob(jobID); + + // the cluster should shut down automatically once the application completes + CommonTestUtils.waitUntilCondition(() -> !cluster.isRunning(), deadline); + } + } + + private JobID generateJobGraph(Configuration configuration, long checkpointInterval) + throws Exception { + final JobVertex jobVertex = new JobVertex("jobVertex"); + jobVertex.setInvokableClass( + AdaptiveSchedulerClusterITCase.CheckpointingNoOpInvokable.class); + jobVertex.setParallelism(1); + + final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = + CheckpointCoordinatorConfiguration.builder() + .setCheckpointInterval(checkpointInterval) + .build(); + final JobCheckpointingSettings checkpointingSettings = + new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null); + JobGraph jobGraph = + JobGraphBuilder.newStreamingJobGraphBuilder() + .addJobVertex(jobVertex) + .setJobCheckpointingSettings(checkpointingSettings) + .build(); + + final Path jobGraphPath = + TEMPORARY_FOLDER.newFile(JOB_GRAPH_FILE_PATH.defaultValue()).toPath(); + try (ObjectOutputStream objectOutputStream = + new ObjectOutputStream(Files.newOutputStream(jobGraphPath, CREATE))) { + objectOutputStream.writeObject(jobGraph); + } + configuration.setString(JOB_GRAPH_FILE_PATH.key(), jobGraphPath.toString()); + return jobGraph.getJobID(); + } + + private long queryCompletedCheckpoints(MiniCluster miniCluster, JobID jobID) + throws InterruptedException, ExecutionException { + return miniCluster + .getArchivedExecutionGraph(jobID) + .get() + .getCheckpointStatsSnapshot() + .getCounts() + .getNumberOfCompletedCheckpoints(); + } + + private static void awaitJobStatus( + MiniCluster cluster, JobID jobId, JobStatus status, Deadline deadline) + throws Exception { + CommonTestUtils.waitUntilCondition( + () -> { + try { + return cluster.getJobStatus(jobId).get() == status; + } catch (ExecutionException e) { + if (ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class) + .isPresent()) { + // job may not be yet submitted + return false; + } + throw e; + } + }, + deadline); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index af40e71..811f303 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -58,11 +58,13 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; /** Tests for the {@link MiniDispatcher}. */ public class MiniDispatcherTest extends TestLogger { @@ -202,6 +204,37 @@ public class MiniDispatcherTest extends TestLogger { } /** + * Tests that in detached mode, the {@link MiniDispatcher} will not complete the future that + * signals job termination if the JobStatus is not globally terminal state. + */ + @Test + public void testNotTerminationWithoutGloballyTerminalState() throws Exception { + final MiniDispatcher miniDispatcher = + createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED); + miniDispatcher.start(); + + try { + // wait until we have submitted the job + final TestingJobManagerRunner testingJobManagerRunner = + testingJobManagerRunnerFactory.takeCreatedJobManagerRunner(); + + testingJobManagerRunner.completeResultFuture( + new ExecutionGraphInfo( + new ArchivedExecutionGraphBuilder() + .setJobID(jobGraph.getJobID()) + .setState(JobStatus.SUSPENDED) + .build())); + + miniDispatcher.getShutDownFuture().get(3, TimeUnit.SECONDS); + fail("The shutDownFuture should not be done."); + } catch (TimeoutException ignored) { + + } finally { + RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout); + } + } + + /** * Tests that the {@link MiniDispatcher} only terminates in {@link * ClusterEntrypoint.ExecutionMode#NORMAL} after it has served the {@link * org.apache.flink.runtime.jobmaster.JobResult} once.