This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 0be2a28 [FLINK-26783] Do not trigger global failover if failed during
commiting side-effects during stop-with-savepoint
0be2a28 is described below
commit 0be2a28d1d1eae03a8cf9d64ab3e7d68f5d87b64
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Tue Mar 22 11:44:13 2022 +0100
[FLINK-26783] Do not trigger global failover if failed during commiting
side-effects during stop-with-savepoint
If a job fails during commitin side-effects of the stop-with-savepoint and
we restart to the latest checkpoint instead of savepoint, we might end up
producing duplicates. On the other hand if we restart to the savepoint we end
up in a situation where a running Flink job depends on the existence of the
savepoint.
In this commit we do not trigger a global failover in case the savepoint
completed successfully, but the job failed during committing side effects. In
that case we will finish the completable future with an exception that explains
that the savepoint is consistent, but it might have uncommitted side effects
and we ask users to manually restart a job from that savepoint if they want to
commit side effects.
This closes #19198
---
.../StopWithSavepointStoppingException.java | 51 ++++++++++
.../StopWithSavepointTerminationHandlerImpl.java | 14 +--
...topWithSavepointTerminationHandlerImplTest.java | 20 +---
.../flink/test/checkpointing/SavepointITCase.java | 109 ++++++++++++++++++++-
4 files changed, 162 insertions(+), 32 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java
new file mode 100644
index 0000000..d41dbc8
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointStoppingException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.stopwithsavepoint;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableType;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception thrown when a savepoint has been created successfully when
stopping with savepoint, but
+ * the job has not finished. In that case side-effects might have not been
committed. This exception
+ * is used to communicate that to the use.
+ */
+@Experimental
+@ThrowableAnnotation(ThrowableType.NonRecoverableError)
+public class StopWithSavepointStoppingException extends FlinkException {
+ private final String savepointPath;
+
+ public StopWithSavepointStoppingException(String savepointPath, JobID
jobID) {
+ super(
+ String.format(
+ "A savepoint has been created at: %s, but the
corresponding job %s failed "
+ + "during stopping. The savepoint is
consistent, but might have "
+ + "uncommitted transactions. If you want to
commit the transaction "
+ + "please restart a job from this savepoint.",
+ savepointPath, jobID));
+ this.savepointPath = savepointPath;
+ }
+
+ public String getSavepointPath() {
+ return savepointPath;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
index aec32d3..f341613 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImpl.java
@@ -24,7 +24,6 @@ import
org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.scheduler.SchedulerNG;
-import org.apache.flink.util.FlinkException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -167,16 +166,13 @@ public class StopWithSavepointTerminationHandlerImpl
*/
private void terminateExceptionallyWithGlobalFailover(
Iterable<ExecutionState> unfinishedExecutionStates, String
savepointPath) {
- String errorMessage =
- String.format(
- "Inconsistent execution state after stopping with
savepoint. At least one execution is still in one of the following states: %s.
A global fail-over is triggered to recover the job %s.",
- StringUtils.join(unfinishedExecutionStates, ", "),
jobId);
- FlinkException inconsistentFinalStateException = new
FlinkException(errorMessage);
+ StopWithSavepointStoppingException inconsistentFinalStateException =
+ new StopWithSavepointStoppingException(savepointPath, jobId);
log.warn(
- "A savepoint was created at {} but the corresponding job {}
didn't terminate successfully.",
- savepointPath,
- jobId,
+ "Inconsistent execution state after stopping with savepoint.
At least one"
+ + " execution is still in one of the following states:
{}.",
+ StringUtils.join(unfinishedExecutionStates, ", "),
inconsistentFinalStateException);
scheduler.handleGlobalFailure(inconsistentFinalStateException);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
index cab4abe..f0c2bee 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/stopwithsavepoint/StopWithSavepointTerminationHandlerImplTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.scheduler.stopwithsavepoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling;
@@ -31,7 +30,6 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
import
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -152,10 +150,6 @@ public class StopWithSavepointTerminationHandlerImplTest
extends TestLogger {
createTestInstance(globalFailOverTriggered::complete);
final ExecutionState expectedNonFinishedState = ExecutionState.FAILED;
- final String expectedErrorMessage =
- String.format(
- "Inconsistent execution state after stopping with
savepoint. At least one execution is still in one of the following states: %s.
A global fail-over is triggered to recover the job %s.",
- expectedNonFinishedState, JOB_ID);
final EmptyStreamStateHandle streamStateHandle = new
EmptyStreamStateHandle();
final CompletedCheckpoint completedSavepoint =
createCompletedSavepoint(streamStateHandle);
@@ -168,24 +162,14 @@ public class StopWithSavepointTerminationHandlerImplTest
extends TestLogger {
testInstance.getSavepointPath().get();
fail("An ExecutionException is expected.");
} catch (Throwable e) {
- final Optional<FlinkException> actualFlinkException =
- ExceptionUtils.findThrowable(e, FlinkException.class);
+ final Optional<StopWithSavepointStoppingException>
actualFlinkException =
+ ExceptionUtils.findThrowable(e,
StopWithSavepointStoppingException.class);
assertTrue(
"A FlinkException should have been thrown.",
actualFlinkException.isPresent());
- assertThat(
- actualFlinkException.get(),
- FlinkMatchers.containsMessage(expectedErrorMessage));
}
assertTrue("Global fail-over was not triggered.",
globalFailOverTriggered.isDone());
- assertThat(
- globalFailOverTriggered.get(),
FlinkMatchers.containsMessage(expectedErrorMessage));
-
assertFalse("Savepoint should not be discarded.",
streamStateHandle.isDisposed());
-
- assertFalse(
- "Checkpoint scheduling should not be enabled in case of
failure.",
- checkpointScheduling.isEnabled());
}
@Test(expected = UnsupportedOperationException.class)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index acc0ddd..a318136 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -59,9 +59,11 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import
org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -143,10 +145,12 @@ import static
org.apache.flink.util.ExceptionUtils.assertThrowable;
import static org.apache.flink.util.ExceptionUtils.assertThrowableWithMessage;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -292,6 +296,79 @@ public class SavepointITCase extends TestLogger {
public void initializeState(FunctionInitializationContext context)
throws Exception {}
}
+ private static final OneShotLatch stopWithSavepointRestartLatch = new
OneShotLatch();
+
+ @Test
+ public void testStopWithSavepointFailsOverToSavepoint() throws Throwable {
+ int sinkParallelism = 5;
+ MiniClusterWithClientResource cluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberSlotsPerTaskManager(sinkParallelism
+ 1)
+ .build());
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));
+ env.setParallelism(1);
+ env.addSource(new InfiniteTestSource())
+ .name("Infinite Source")
+ .map(new FailingOnCompletedSavepointMapFunction(2))
+ .addSink(new DiscardingSink<>())
+ // different parallelism to break chaining and add some
concurrent tasks
+ .setParallelism(sinkParallelism);
+
+ final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+ cluster.before();
+ try {
+ ClusterClient<?> client = cluster.getClusterClient();
+ client.submitJob(jobGraph).get();
+ waitUntilAllTasksAreRunning(cluster.getRestClusterClient(),
jobGraph.getJobID());
+
+
cluster.getMiniCluster().triggerCheckpoint(jobGraph.getJobID()).get();
+ final CompletableFuture<String> savepointCompleted =
+ client.stopWithSavepoint(
+ jobGraph.getJobID(),
+ true,
+ savepointDir.getAbsolutePath(),
+ SavepointFormatType.CANONICAL);
+
+ final Throwable savepointException =
+ assertThrows(ExecutionException.class,
savepointCompleted::get).getCause();
+ assertThrowable(
+ savepointException,
+ throwable ->
+ throwable instanceof
StopWithSavepointStoppingException
+ && throwable
+ .getMessage()
+ .startsWith("A savepoint has been
created at: "));
+ assertThat(client.getJobStatus(jobGraph.getJobID()).get(),
is(JobStatus.FAILED));
+ } finally {
+ cluster.after();
+ }
+ }
+
+ private static final class FailingOnCompletedSavepointMapFunction
+ extends RichMapFunction<Integer, Integer> implements
CheckpointListener {
+ private final long savepointId;
+
+ private FailingOnCompletedSavepointMapFunction(long savepointId) {
+ this.savepointId = savepointId;
+ }
+
+ @Override
+ public Integer map(Integer value) throws Exception {
+ return value;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws
Exception {
+ if (checkpointId == savepointId) {
+ throw new ExpectedTestException();
+ }
+ }
+ }
+
/**
* Triggers a savepoint for a job that uses the FsStateBackend. We expect
that all checkpoint
* files are written to a new savepoint directory.
@@ -924,7 +1001,8 @@ public class SavepointITCase extends TestLogger {
// 1. task failure restart
// 2. job failover triggered by the CheckpointFailureManager
2,
- assertInSnapshotCreationFailure());
+ assertInSnapshotCreationFailure(),
+ true);
}
@Test
@@ -937,8 +1015,26 @@ public class SavepointITCase extends TestLogger {
// two restarts expected:
// 1. task failure restart
// 2. job failover triggered by SchedulerBase.stopWithSavepoint
- 2,
- assertAfterSnapshotCreationFailure());
+ 0,
+ (jobId, actualException) -> {
+ if (ClusterOptions.isAdaptiveSchedulerEnabled(new
Configuration())) {
+ return actualException
+ .getMessage()
+ .contains("Stop with savepoint operation could
not be completed");
+ } else {
+ Optional<StopWithSavepointStoppingException>
actualFlinkException =
+ findThrowable(
+ actualException,
StopWithSavepointStoppingException.class);
+ return actualFlinkException
+ .map(
+ e ->
+ e.getMessage()
+ .startsWith(
+ "A savepoint
has been created at:"))
+ .orElse(false);
+ }
+ },
+ false);
}
@Test
@@ -1051,7 +1147,8 @@ public class SavepointITCase extends TestLogger {
InfiniteTestSource failingSource,
File savepointDir,
int expectedMaximumNumberOfRestarts,
- BiFunction<JobID, ExecutionException, Boolean> exceptionAssertion)
+ BiFunction<JobID, ExecutionException, Boolean> exceptionAssertion,
+ boolean shouldRestart)
throws Exception {
MiniClusterWithClientResource cluster =
new MiniClusterWithClientResource(
@@ -1107,7 +1204,9 @@ public class SavepointITCase extends TestLogger {
assertThrowable(e, ex ->
exceptionAssertion.apply(jobGraph.getJobID(), e));
}
- waitUntilAllTasksAreRunning(cluster.getRestClusterClient(),
jobGraph.getJobID());
+ if (shouldRestart) {
+ waitUntilAllTasksAreRunning(cluster.getRestClusterClient(),
jobGraph.getJobID());
+ }
} finally {
cluster.after();
}