This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new fe5a171  [FLINK-25486][Runtime/Coordination] Fix the bug that flink 
will lost state when zookeeper leader changes
fe5a171 is described below

commit fe5a1718368e62eb7ac47c00aabbd94173dae668
Author: liujiangang <[email protected]>
AuthorDate: Fri Jan 7 18:28:15 2022 +0800

    [FLINK-25486][Runtime/Coordination] Fix the bug that flink will lost state 
when zookeeper leader changes
    
    This closes #18560.
---
 .../flink/runtime/dispatcher/MiniDispatcher.java   |  21 +-
 .../runtime/dispatcher/JobDispatcherITCase.java    | 248 +++++++++++++++++++++
 .../runtime/dispatcher/MiniDispatcherTest.java     |  29 +++
 .../runtime/minicluster/TestingMiniCluster.java    |  28 ++-
 4 files changed, 318 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
index 83f173d..99bb716 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
@@ -32,6 +33,7 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.util.FlinkException;
 
 import java.util.Collections;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -102,8 +104,12 @@ public class MiniDispatcher extends Dispatcher {
                                         ? ApplicationStatus.FAILED
                                         : ApplicationStatus.SUCCEEDED;
 
-                        log.info("Shutting down cluster because someone 
retrieved the job result.");
-                        shutDownFuture.complete(status);
+                        if 
(!ApplicationStatus.UNKNOWN.equals(result.getApplicationStatus())) {
+                            log.info(
+                                    "Shutting down cluster because someone 
retrieved the job result"
+                                            + " and the status is globally 
terminal.");
+                            shutDownFuture.complete(status);
+                        }
                     });
         } else {
             log.info("Not shutting down cluster after someone retrieved the 
job result.");
@@ -124,16 +130,19 @@ public class MiniDispatcher extends Dispatcher {
                 executionGraphInfo.getArchivedExecutionGraph();
         final CleanupJobState cleanupHAState = 
super.jobReachedTerminalState(executionGraphInfo);
 
-        if (jobCancelled || executionMode == 
ClusterEntrypoint.ExecutionMode.DETACHED) {
+        JobStatus jobStatus =
+                Objects.requireNonNull(
+                        archivedExecutionGraph.getState(), "JobStatus should 
not be null here.");
+        if (jobStatus.isGloballyTerminalState()
+                && (jobCancelled || executionMode == 
ClusterEntrypoint.ExecutionMode.DETACHED)) {
             // shut down if job is cancelled or we don't have to wait for the 
execution result
             // retrieval
             log.info(
                     "Shutting down cluster with state {}, jobCancelled: {}, 
executionMode: {}",
-                    archivedExecutionGraph.getState(),
+                    jobStatus,
                     jobCancelled,
                     executionMode);
-            shutDownFuture.complete(
-                    
ApplicationStatus.fromJobStatus(archivedExecutionGraph.getState()));
+            
shutDownFuture.complete(ApplicationStatus.fromJobStatus(jobStatus));
         }
 
         return cleanupHAState;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
new file mode 100644
index 0000000..f34b5ee
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobDispatcherITCase.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import 
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import 
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import 
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever;
+import org.apache.flink.runtime.execution.Environment;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static 
org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+/** An integration test which recovers from checkpoint after regaining the 
leadership. */
+public class JobDispatcherITCase extends TestLogger {
+
+    private static final Duration TIMEOUT = Duration.ofMinutes(10);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    private Supplier<DispatcherResourceManagerComponentFactory>
+            createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                    Configuration configuration) {
+        return () -> {
+            try {
+                return new DefaultDispatcherResourceManagerComponentFactory(
+                        new DefaultDispatcherRunnerFactory(
+                                
JobDispatcherLeaderProcessFactoryFactory.create(
+                                        
FileJobGraphRetriever.createFrom(configuration, null))),
+                        StandaloneResourceManagerFactory.getInstance(),
+                        JobRestEndpointFactory.INSTANCE);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+    }
+
+    @Test
+    public void testRecoverFromCheckpointAfterLosingAndRegainingLeadership() 
throws Exception {
+        final Deadline deadline = Deadline.fromNow(TIMEOUT);
+        final Configuration configuration = new Configuration();
+        configuration.set(HighAvailabilityOptions.HA_MODE, 
HighAvailabilityMode.ZOOKEEPER.name());
+        final TestingMiniClusterConfiguration clusterConfiguration =
+                new TestingMiniClusterConfiguration.Builder()
+                        .setConfiguration(configuration)
+                        .build();
+        final EmbeddedHaServicesWithLeadershipControl haServices =
+                new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
+
+        final Configuration newConfiguration =
+                new Configuration(clusterConfiguration.getConfiguration());
+        final long checkpointInterval = 100;
+        final JobID jobID =
+                generateAndPersistJobGraph(
+                        newConfiguration,
+                        checkpointInterval,
+                        TEMPORARY_FOLDER.newFolder().toPath());
+
+        AtLeastOneCheckpointInvokable.reset();
+
+        try (final MiniCluster cluster =
+                new TestingMiniCluster(
+                        clusterConfiguration,
+                        () -> haServices,
+                        
createJobModeDispatcherResourceManagerComponentFactorySupplier(
+                                newConfiguration))) {
+            // start mini cluster and submit the job
+            cluster.start();
+
+            
AtLeastOneCheckpointInvokable.atLeastOneCheckpointCompleted.await();
+
+            final CompletableFuture<JobResult> firstJobResult = 
cluster.requestJobResult(jobID);
+            haServices.revokeDispatcherLeadership();
+            // make sure the leadership is revoked to avoid race conditions
+            assertEquals(ApplicationStatus.UNKNOWN, 
firstJobResult.get().getApplicationStatus());
+
+            haServices.grantDispatcherLeadership();
+
+            // job is suspended, wait until it's running
+            awaitJobStatus(cluster, jobID, JobStatus.RUNNING, deadline);
+
+            assertNotNull(
+                    cluster.getExecutionGraph(jobID)
+                            .get()
+                            .getCheckpointStatsSnapshot()
+                            .getLatestRestoredCheckpoint());
+        }
+    }
+
+    private JobID generateAndPersistJobGraph(
+            Configuration configuration, long checkpointInterval, Path 
tmpPath) throws Exception {
+        final JobVertex jobVertex = new JobVertex("jobVertex");
+        jobVertex.setInvokableClass(AtLeastOneCheckpointInvokable.class);
+        jobVertex.setParallelism(1);
+
+        final CheckpointCoordinatorConfiguration 
checkpointCoordinatorConfiguration =
+                CheckpointCoordinatorConfiguration.builder()
+                        .setCheckpointInterval(checkpointInterval)
+                        .build();
+        final JobCheckpointingSettings checkpointingSettings =
+                new 
JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
+        final JobGraph jobGraph =
+                JobGraphBuilder.newStreamingJobGraphBuilder()
+                        .addJobVertex(jobVertex)
+                        .setJobCheckpointingSettings(checkpointingSettings)
+                        .build();
+
+        final Path jobGraphPath = 
tmpPath.resolve(JOB_GRAPH_FILE_PATH.defaultValue());
+        try (ObjectOutputStream objectOutputStream =
+                new ObjectOutputStream(Files.newOutputStream(jobGraphPath, 
CREATE))) {
+            objectOutputStream.writeObject(jobGraph);
+        }
+        configuration.setString(JOB_GRAPH_FILE_PATH.key(), 
jobGraphPath.toString());
+        return jobGraph.getJobID();
+    }
+
+    private static void awaitJobStatus(
+            MiniCluster cluster, JobID jobId, JobStatus status, Deadline 
deadline)
+            throws Exception {
+        CommonTestUtils.waitUntilCondition(
+                () -> {
+                    try {
+                        return cluster.getJobStatus(jobId).get() == status;
+                    } catch (ExecutionException e) {
+                        if (ExceptionUtils.findThrowable(e, 
FlinkJobNotFoundException.class)
+                                .isPresent()) {
+                            // job may not be yet submitted
+                            return false;
+                        }
+                        throw e;
+                    }
+                },
+                deadline);
+    }
+
+    /**
+     * An invokable that supports checkpointing and counts down when there is 
at least one
+     * checkpoint.
+     */
+    public static class AtLeastOneCheckpointInvokable extends 
AbstractInvokable {
+
+        private static final long CANCEL_SIGNAL = -2L;
+        private final BlockingQueue<Long> checkpointsToConfirm = new 
ArrayBlockingQueue<>(1);
+
+        private static volatile CountDownLatch atLeastOneCheckpointCompleted;
+
+        private static void reset() {
+            atLeastOneCheckpointCompleted = new CountDownLatch(1);
+        }
+
+        public AtLeastOneCheckpointInvokable(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public void invoke() throws Exception {
+            long signal = checkpointsToConfirm.take();
+            while (signal != CANCEL_SIGNAL) {
+                getEnvironment().acknowledgeCheckpoint(signal, new 
CheckpointMetrics());
+                signal = checkpointsToConfirm.take();
+            }
+        }
+
+        @Override
+        public Future<Void> cancel() throws Exception {
+            checkpointsToConfirm.add(CANCEL_SIGNAL);
+            return FutureUtils.completedVoidFuture();
+        }
+
+        @Override
+        public CompletableFuture<Boolean> triggerCheckpointAsync(
+                CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
+            checkpointsToConfirm.add(checkpointMetaData.getCheckpointId());
+            return CompletableFuture.completedFuture(true);
+        }
+
+        @Override
+        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
+            atLeastOneCheckpointCompleted.countDown();
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
index ec1e9c5..9420a7c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java
@@ -178,6 +178,35 @@ public class MiniDispatcherTest extends TestLogger {
     }
 
     /**
+     * Tests that in detached mode, the {@link MiniDispatcher} will not 
complete the future that
+     * signals job termination if the JobStatus is not globally terminal state.
+     */
+    @Test
+    public void testNotTerminationWithoutGloballyTerminalState() throws 
Exception {
+        final MiniDispatcher miniDispatcher =
+                createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
+        miniDispatcher.start();
+
+        try {
+            // wait until we have submitted the job
+            final TestingJobManagerRunner testingJobManagerRunner =
+                    
testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
+
+            testingJobManagerRunner.completeResultFuture(
+                    new ExecutionGraphInfo(
+                            new ArchivedExecutionGraphBuilder()
+                                    .setJobID(jobGraph.getJobID())
+                                    .setState(JobStatus.SUSPENDED)
+                                    .build()));
+
+            testingJobManagerRunner.getTerminationFuture().get();
+            assertThat(miniDispatcher.getShutDownFuture().isDone(), is(false));
+        } finally {
+            RpcUtils.terminateRpcEndpoint(miniDispatcher, timeout);
+        }
+    }
+
+    /**
      * Tests that the {@link MiniDispatcher} only terminates in {@link
      * ClusterEntrypoint.ExecutionMode#NORMAL} after it has served the {@link
      * org.apache.flink.runtime.jobmaster.JobResult} once.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
index d0767a6..165c9a3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
@@ -48,18 +48,33 @@ public class TestingMiniCluster extends MiniCluster {
 
     @Nullable private final Supplier<HighAvailabilityServices> 
highAvailabilityServicesSupplier;
 
+    @Nullable
+    private final Supplier<DispatcherResourceManagerComponentFactory>
+            dispatcherResourceManagerComponentFactorySupplier;
+
     public TestingMiniCluster(
             TestingMiniClusterConfiguration miniClusterConfiguration,
-            @Nullable Supplier<HighAvailabilityServices> 
highAvailabilityServicesSupplier) {
+            @Nullable Supplier<HighAvailabilityServices> 
highAvailabilityServicesSupplier,
+            @Nullable
+                    Supplier<DispatcherResourceManagerComponentFactory>
+                            dispatcherResourceManagerComponentFactorySupplier) 
{
         super(miniClusterConfiguration);
         this.numberDispatcherResourceManagerComponents =
                 
miniClusterConfiguration.getNumberDispatcherResourceManagerComponents();
         this.highAvailabilityServicesSupplier = 
highAvailabilityServicesSupplier;
+        this.dispatcherResourceManagerComponentFactorySupplier =
+                dispatcherResourceManagerComponentFactorySupplier;
         this.localCommunication = 
miniClusterConfiguration.isLocalCommunication();
     }
 
+    public TestingMiniCluster(
+            TestingMiniClusterConfiguration miniClusterConfiguration,
+            @Nullable Supplier<HighAvailabilityServices> 
highAvailabilityServicesSupplier) {
+        this(miniClusterConfiguration, highAvailabilityServicesSupplier, null);
+    }
+
     public TestingMiniCluster(TestingMiniClusterConfiguration 
miniClusterConfiguration) {
-        this(miniClusterConfiguration, null);
+        this(miniClusterConfiguration, null, null);
     }
 
     @Override
@@ -78,6 +93,15 @@ public class TestingMiniCluster extends MiniCluster {
     }
 
     @Override
+    DispatcherResourceManagerComponentFactory 
createDispatcherResourceManagerComponentFactory() {
+        if (dispatcherResourceManagerComponentFactorySupplier != null) {
+            return dispatcherResourceManagerComponentFactorySupplier.get();
+        } else {
+            return super.createDispatcherResourceManagerComponentFactory();
+        }
+    }
+
+    @Override
     protected Collection<? extends DispatcherResourceManagerComponent>
             createDispatcherResourceManagerComponents(
                     Configuration configuration,

Reply via email to