This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 0be2a28  [FLINK-26783] Do not trigger global failover if failed during 
commiting side-effects during stop-with-savepoint
0be2a28 is described below

commit 0be2a28d1d1eae03a8cf9d64ab3e7d68f5d87b64
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Tue Mar 22 11:44:13 2022 +0100

    [FLINK-26783] Do not trigger global failover if failed during commiting 
side-effects during stop-with-savepoint
    
    If a job fails during commitin side-effects of the stop-with-savepoint and 
we restart to the latest checkpoint instead of savepoint, we might end up 
producing duplicates. On the other hand if we restart to the savepoint we end 
up in a situation where a running Flink job depends on the existence of the 
savepoint.
    
    In this commit we do not trigger a global failover in case the savepoint 
completed successfully, but the job failed during committing side effects. In 
that case we will finish the completable future with an exception that explains 
that the savepoint is consistent, but it might have uncommitted side effects 
and we ask users to manually restart a job from that savepoint if they want to 
commit side effects.
    
    This closes #19198
---
 .../StopWithSavepointStoppingException.java        |  51 ++++++++++
 .../StopWithSavepointTerminationHandlerImpl.java   |  14 +--
 ...topWithSavepointTerminationHandlerImplTest.java |  20 +---
 .../flink/test/checkpointing/SavepointITCase.java  | 109 ++++++++++++++++++++-
 4 files changed, 162 insertions(+), 32 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java
new file mode 100644
index 0000000..d41dbc8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableType;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception thrown when a savepoint has been created successfully when 
stopping with savepoint, but
+ * the job has not finished. In that case side-effects might have not been 
committed. This exception
+ * is used to communicate that to the use.
+ */
+@Experimental
+@ThrowableAnnotation(ThrowableType.NonRecoverableError)
+public class StopWithSavepointStoppingException extends FlinkException {
+    private final String savepointPath;
+
+    public StopWithSavepointStoppingException(String savepointPath, JobID 
jobID) {
+        super(
+                String.format(
+                        "A savepoint has been created at: %s, but the 
corresponding job %s failed "
+                                + "during stopping. The savepoint is 
consistent, but might have "
+                                + "uncommitted transactions. If you want to 
commit the transaction "
+                                + "please restart a job from this savepoint.",
+                        savepointPath, jobID));
+        this.savepointPath = savepointPath;
+    }
+
+    public String getSavepointPath() {
+        return savepointPath;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
index aec32d3..f341613 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointScheduling;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
-import org.apache.flink.util.FlinkException;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -167,16 +166,13 @@ public class StopWithSavepointTerminationHandlerImpl
      */
     private void terminateExceptionallyWithGlobalFailover(
             Iterable<ExecutionState> unfinishedExecutionStates, String 
savepointPath) {
-        String errorMessage =
-                String.format(
-                        "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
-                        StringUtils.join(unfinishedExecutionStates, ", "), 
jobId);
-        FlinkException inconsistentFinalStateException = new 
FlinkException(errorMessage);
+        StopWithSavepointStoppingException inconsistentFinalStateException =
+                new StopWithSavepointStoppingException(savepointPath, jobId);
 
         log.warn(
-                "A savepoint was created at {} but the corresponding job {} 
didn't terminate successfully.",
-                savepointPath,
-                jobId,
+                "Inconsistent execution state after stopping with savepoint. 
At least one"
+                        + " execution is still in one of the following states: 
{}.",
+                StringUtils.join(unfinishedExecutionStates, ", "),
                 inconsistentFinalStateException);
 
         scheduler.handleGlobalFailure(inconsistentFinalStateException);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
index cab4abe..f0c2bee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.scheduler.stopwithsavepoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling;
@@ -31,7 +30,6 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -152,10 +150,6 @@ public class StopWithSavepointTerminationHandlerImplTest 
extends TestLogger {
                 createTestInstance(globalFailOverTriggered::complete);
 
         final ExecutionState expectedNonFinishedState = ExecutionState.FAILED;
-        final String expectedErrorMessage =
-                String.format(
-                        "Inconsistent execution state after stopping with 
savepoint. At least one execution is still in one of the following states: %s. 
A global fail-over is triggered to recover the job %s.",
-                        expectedNonFinishedState, JOB_ID);
 
         final EmptyStreamStateHandle streamStateHandle = new 
EmptyStreamStateHandle();
         final CompletedCheckpoint completedSavepoint = 
createCompletedSavepoint(streamStateHandle);
@@ -168,24 +162,14 @@ public class StopWithSavepointTerminationHandlerImplTest 
extends TestLogger {
             testInstance.getSavepointPath().get();
             fail("An ExecutionException is expected.");
         } catch (Throwable e) {
-            final Optional<FlinkException> actualFlinkException =
-                    ExceptionUtils.findThrowable(e, FlinkException.class);
+            final Optional<StopWithSavepointStoppingException> 
actualFlinkException =
+                    ExceptionUtils.findThrowable(e, 
StopWithSavepointStoppingException.class);
             assertTrue(
                     "A FlinkException should have been thrown.", 
actualFlinkException.isPresent());
-            assertThat(
-                    actualFlinkException.get(),
-                    FlinkMatchers.containsMessage(expectedErrorMessage));
         }
 
         assertTrue("Global fail-over was not triggered.", 
globalFailOverTriggered.isDone());
-        assertThat(
-                globalFailOverTriggered.get(), 
FlinkMatchers.containsMessage(expectedErrorMessage));
-
         assertFalse("Savepoint should not be discarded.", 
streamStateHandle.isDisposed());
-
-        assertFalse(
-                "Checkpoint scheduling should not be enabled in case of 
failure.",
-                checkpointScheduling.isEnabled());
     }
 
     @Test(expected = UnsupportedOperationException.class)
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index acc0ddd..a318136 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -59,9 +59,11 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import 
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -143,10 +145,12 @@ import static 
org.apache.flink.util.ExceptionUtils.assertThrowable;
 import static org.apache.flink.util.ExceptionUtils.assertThrowableWithMessage;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -292,6 +296,79 @@ public class SavepointITCase extends TestLogger {
         public void initializeState(FunctionInitializationContext context) 
throws Exception {}
     }
 
+    private static final OneShotLatch stopWithSavepointRestartLatch = new 
OneShotLatch();
+
+    @Test
+    public void testStopWithSavepointFailsOverToSavepoint() throws Throwable {
+        int sinkParallelism = 5;
+        MiniClusterWithClientResource cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setNumberSlotsPerTaskManager(sinkParallelism 
+ 1)
+                                .build());
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));
+        env.setParallelism(1);
+        env.addSource(new InfiniteTestSource())
+                .name("Infinite Source")
+                .map(new FailingOnCompletedSavepointMapFunction(2))
+                .addSink(new DiscardingSink<>())
+                // different parallelism to break chaining and add some 
concurrent tasks
+                .setParallelism(sinkParallelism);
+
+        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+        cluster.before();
+        try {
+            ClusterClient<?> client = cluster.getClusterClient();
+            client.submitJob(jobGraph).get();
+            waitUntilAllTasksAreRunning(cluster.getRestClusterClient(), 
jobGraph.getJobID());
+
+            
cluster.getMiniCluster().triggerCheckpoint(jobGraph.getJobID()).get();
+            final CompletableFuture<String> savepointCompleted =
+                    client.stopWithSavepoint(
+                            jobGraph.getJobID(),
+                            true,
+                            savepointDir.getAbsolutePath(),
+                            SavepointFormatType.CANONICAL);
+
+            final Throwable savepointException =
+                    assertThrows(ExecutionException.class, 
savepointCompleted::get).getCause();
+            assertThrowable(
+                    savepointException,
+                    throwable ->
+                            throwable instanceof 
StopWithSavepointStoppingException
+                                    && throwable
+                                            .getMessage()
+                                            .startsWith("A savepoint has been 
created at: "));
+            assertThat(client.getJobStatus(jobGraph.getJobID()).get(), 
is(JobStatus.FAILED));
+        } finally {
+            cluster.after();
+        }
+    }
+
+    private static final class FailingOnCompletedSavepointMapFunction
+            extends RichMapFunction<Integer, Integer> implements 
CheckpointListener {
+        private final long savepointId;
+
+        private FailingOnCompletedSavepointMapFunction(long savepointId) {
+            this.savepointId = savepointId;
+        }
+
+        @Override
+        public Integer map(Integer value) throws Exception {
+            return value;
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+            if (checkpointId == savepointId) {
+                throw new ExpectedTestException();
+            }
+        }
+    }
+
     /**
      * Triggers a savepoint for a job that uses the FsStateBackend. We expect 
that all checkpoint
      * files are written to a new savepoint directory.
@@ -924,7 +1001,8 @@ public class SavepointITCase extends TestLogger {
                 // 1. task failure restart
                 // 2. job failover triggered by the CheckpointFailureManager
                 2,
-                assertInSnapshotCreationFailure());
+                assertInSnapshotCreationFailure(),
+                true);
     }
 
     @Test
@@ -937,8 +1015,26 @@ public class SavepointITCase extends TestLogger {
                 // two restarts expected:
                 // 1. task failure restart
                 // 2. job failover triggered by SchedulerBase.stopWithSavepoint
-                2,
-                assertAfterSnapshotCreationFailure());
+                0,
+                (jobId, actualException) -> {
+                    if (ClusterOptions.isAdaptiveSchedulerEnabled(new 
Configuration())) {
+                        return actualException
+                                .getMessage()
+                                .contains("Stop with savepoint operation could 
not be completed");
+                    } else {
+                        Optional<StopWithSavepointStoppingException> 
actualFlinkException =
+                                findThrowable(
+                                        actualException, 
StopWithSavepointStoppingException.class);
+                        return actualFlinkException
+                                .map(
+                                        e ->
+                                                e.getMessage()
+                                                        .startsWith(
+                                                                "A savepoint 
has been created at:"))
+                                .orElse(false);
+                    }
+                },
+                false);
     }
 
     @Test
@@ -1051,7 +1147,8 @@ public class SavepointITCase extends TestLogger {
             InfiniteTestSource failingSource,
             File savepointDir,
             int expectedMaximumNumberOfRestarts,
-            BiFunction<JobID, ExecutionException, Boolean> exceptionAssertion)
+            BiFunction<JobID, ExecutionException, Boolean> exceptionAssertion,
+            boolean shouldRestart)
             throws Exception {
         MiniClusterWithClientResource cluster =
                 new MiniClusterWithClientResource(
@@ -1107,7 +1204,9 @@ public class SavepointITCase extends TestLogger {
                 assertThrowable(e, ex -> 
exceptionAssertion.apply(jobGraph.getJobID(), e));
             }
 
-            waitUntilAllTasksAreRunning(cluster.getRestClusterClient(), 
jobGraph.getJobID());
+            if (shouldRestart) {
+                waitUntilAllTasksAreRunning(cluster.getRestClusterClient(), 
jobGraph.getJobID());
+            }
         } finally {
             cluster.after();
         }

Reply via email to