This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.7 by this push: new 9477b6a [FLINK-10753] Improve propagation and logging of snapshot exceptions 9477b6a is described below commit 9477b6a9b8a640d70079543329f7de844403f88d Author: Stefan Richter <s.rich...@data-artisans.com> AuthorDate: Thu Nov 8 15:51:16 2018 +0100 [FLINK-10753] Improve propagation and logging of snapshot exceptions This closes #7064. Signed-off-by: Stefan Richter <s.rich...@data-artisans.com> --- .../runtime/checkpoint/CheckpointCoordinator.java | 9 ++++++--- .../flink/runtime/checkpoint/PendingCheckpoint.java | 21 ++++++++++----------- .../taskexecutor/rpc/RpcCheckpointResponder.java | 6 ++++++ .../api/operators/AbstractStreamOperator.java | 6 ++++-- .../restore/AbstractOperatorRestoreTestBase.java | 1 + 5 files changed, 27 insertions(+), 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 57337b6..02b6fa4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1249,11 +1249,14 @@ public class CheckpointCoordinator { final long checkpointId = pendingCheckpoint.getCheckpointId(); - final String reason = (cause != null) ? cause.getMessage() : ""; + LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause); - LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason); + if (cause != null) { + pendingCheckpoint.abortError(cause); + } else { + pendingCheckpoint.abortDeclined(); + } - pendingCheckpoint.abortDeclined(); rememberRecentCheckpointId(checkpointId); // we don't have to schedule another "dissolving" checkpoint any more because the diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 1b51ac4..1bc6b0e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -34,6 +34,7 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; @@ -433,25 +434,23 @@ public class PendingCheckpoint { } } + public void abortDeclined() { - try { - Exception cause = new Exception("Checkpoint was declined (tasks not ready)"); - onCompletionPromise.completeExceptionally(cause); - reportFailedCheckpoint(cause); - } finally { - dispose(true); - } + abortWithCause(new Exception("Checkpoint was declined (tasks not ready)")); } /** * Aborts the pending checkpoint due to an error. * @param cause The error's exception. */ - public void abortError(Throwable cause) { + public void abortError(@Nonnull Throwable cause) { + abortWithCause(new Exception("Checkpoint failed: " + cause.getMessage(), cause)); + } + + private void abortWithCause(@Nonnull Exception cause) { try { - Exception failure = new Exception("Checkpoint failed: " + cause.getMessage(), cause); - onCompletionPromise.completeExceptionally(failure); - reportFailedCheckpoint(failure); + onCompletionPromise.completeExceptionally(cause); + reportFailedCheckpoint(cause); } finally { dispose(true); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java index aba8bda..918fa50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java @@ -26,8 +26,13 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class RpcCheckpointResponder implements CheckpointResponder { + private static final Logger LOG = LoggerFactory.getLogger(RpcCheckpointResponder.class); + private final CheckpointCoordinatorGateway checkpointCoordinatorGateway; public RpcCheckpointResponder(CheckpointCoordinatorGateway checkpointCoordinatorGateway) { @@ -57,6 +62,7 @@ public class RpcCheckpointResponder implements CheckpointResponder { long checkpointId, Throwable cause) { + LOG.info("Declining checkpoint {} of job {}.", checkpointId, jobID, cause); checkpointCoordinatorGateway.declineCheckpoint(jobID, executionAttemptID, checkpointId, cause); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a63a797..4967cb9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -413,8 +413,10 @@ public abstract class AbstractStreamOperator<OUT> snapshotException.addSuppressed(e); } - throw new Exception("Could not complete snapshot " + checkpointId + " for operator " + - getOperatorName() + '.', snapshotException); + String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " + + getOperatorName() + "."; + + throw new Exception(snapshotFailMessage, snapshotException); } return snapshotInProgress; diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index 3db0f62..097616f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -142,6 +142,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { } catch (Exception e) { String exceptionString = ExceptionUtils.stringifyException(e); if (!(exceptionString.matches("(.*\n)*.*savepoint for the job .* failed(.*\n)*") // legacy + || exceptionString.matches("(.*\n)*.*was not running(.*\n)*") || exceptionString.matches("(.*\n)*.*Not all required tasks are currently running(.*\n)*") // new || exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not ready\\)(.*\n)*"))) { // new throw e;