This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new bc9f0fc [FLINK-16753][checkpointing] Use CheckpointException to wrap
exceptions thrown from AsyncCheckpointRunnable (#14072)
bc9f0fc is described below
commit bc9f0fc7d6a02211e1e30ecc425d2dbfe4a0bcb5
Author: Jiayi Liao <[email protected]>
AuthorDate: Mon Nov 16 17:20:56 2020 +0800
[FLINK-16753][checkpointing] Use CheckpointException to wrap exceptions
thrown from AsyncCheckpointRunnable (#14072)
---
.../checkpoint/CheckpointFailureManager.java | 1 +
.../checkpoint/CheckpointFailureReason.java | 2 +
.../runtime/tasks/AsyncCheckpointRunnable.java | 6 +-
.../runtime/tasks/AsyncCheckpointRunnableTest.java | 112 +++++++++++++++++++++
4 files changed, 120 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index fbf5d98..ca8f42d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -117,6 +117,7 @@ public class CheckpointFailureManager {
case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM:
case EXCEPTION:
+ case CHECKPOINT_ASYNC_EXCEPTION:
case TASK_FAILURE:
case TASK_CHECKPOINT_FAILURE:
case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE:
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
index cd787d0..e20e57f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
@@ -36,6 +36,8 @@ public enum CheckpointFailureReason {
EXCEPTION(true, "An Exception occurred while triggering the
checkpoint."),
+ CHECKPOINT_ASYNC_EXCEPTION(false, "Asynchronous task checkpoint
failed."),
+
CHECKPOINT_EXPIRED(false, "Checkpoint expired before completing."),
CHECKPOINT_SUBSUMED(false, "Checkpoint has been subsumed."),
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index af10411..e5aeb68 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
@@ -195,7 +197,9 @@ final class AsyncCheckpointRunnable implements Runnable,
Closeable {
// We only report the exception for the
original cause of fail and cleanup.
// Otherwise this followup exception could race
the original exception in failing the task.
try {
-
taskEnvironment.declineCheckpoint(checkpointMetaData.getCheckpointId(),
checkpointException);
+ taskEnvironment.declineCheckpoint(
+
checkpointMetaData.getCheckpointId(),
+ new
CheckpointException(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION,
checkpointException));
} catch (Exception unhandled) {
AsynchronousException asyncException =
new AsynchronousException(unhandled);
asyncExceptionHandler.handleAsyncException("Failure in asynchronous checkpoint
materialization", asyncException);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
new file mode 100644
index 0000000..30e8e80
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for {@link AsyncCheckpointRunnable}.
+ */
+public class AsyncCheckpointRunnableTest {
+
+ @Test
+ public void testAsyncCheckpointException() {
+ final Map<OperatorID, OperatorSnapshotFutures>
snapshotsInProgress = new HashMap<>();
+ snapshotsInProgress.put(
+ new OperatorID(),
+ new OperatorSnapshotFutures(
+ ExceptionallyDoneFuture.of(new
RuntimeException("Async Checkpoint Exception")),
+
DoneFuture.of(SnapshotResult.empty()),
+
DoneFuture.of(SnapshotResult.empty()),
+
DoneFuture.of(SnapshotResult.empty()),
+
DoneFuture.of(SnapshotResult.empty()),
+
DoneFuture.of(SnapshotResult.empty())));
+
+ final TestEnvironment environment = new TestEnvironment();
+ final AsyncCheckpointRunnable runnable = new
AsyncCheckpointRunnable(
+ snapshotsInProgress,
+ new CheckpointMetaData(1, 1L),
+ new CheckpointMetrics(),
+ 1L,
+ "Task Name",
+ r -> {},
+ r -> {},
+ environment,
+ (msg, ex) -> {});
+ runnable.run();
+
+ Assert.assertTrue(environment.getCause() instanceof
CheckpointException);
+ Assert.assertSame(((CheckpointException) environment.getCause())
+ .getCheckpointFailureReason(),
CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
+ }
+
+ private static class TestEnvironment extends StreamMockEnvironment {
+
+ Throwable cause = null;
+
+ TestEnvironment() {
+ this(
+ new Configuration(),
+ new Configuration(),
+ new ExecutionConfig(),
+ 1L,
+ new MockInputSplitProvider(),
+ 1,
+ new TestTaskStateManager());
+ }
+
+ TestEnvironment(
+ Configuration jobConfig,
+ Configuration taskConfig,
+ ExecutionConfig executionConfig,
+ long memorySize,
+ MockInputSplitProvider inputSplitProvider,
+ int bufferSize,
+ TaskStateManager taskStateManager) {
+ super(jobConfig, taskConfig, executionConfig,
memorySize, inputSplitProvider, bufferSize, taskStateManager);
+ }
+
+ @Override
+ public void declineCheckpoint(long checkpointId, Throwable
cause) {
+ this.cause = cause;
+ }
+
+ Throwable getCause() {
+ return cause;
+ }
+ }
+}