This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 6814e5f  [FLINK-10753] Improve propagation and logging of snapshot 
exceptions
6814e5f is described below

commit 6814e5fc64a289a10dbd509dd6055448fc6be71d
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Thu Nov 8 15:51:16 2018 +0100

    [FLINK-10753] Improve propagation and logging of snapshot exceptions
    
    This closes #7064.
    
    Signed-off-by: Stefan Richter <s.rich...@data-artisans.com>
---
 .../runtime/checkpoint/CheckpointCoordinator.java   |  9 ++++++---
 .../flink/runtime/checkpoint/PendingCheckpoint.java | 21 ++++++++++-----------
 .../taskexecutor/rpc/RpcCheckpointResponder.java    |  6 ++++++
 .../api/operators/AbstractStreamOperator.java       |  6 ++++--
 .../restore/AbstractOperatorRestoreTestBase.java    |  1 +
 5 files changed, 27 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 57337b6..02b6fa4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1249,11 +1249,14 @@ public class CheckpointCoordinator {
 
                final long checkpointId = pendingCheckpoint.getCheckpointId();
 
-               final String reason = (cause != null) ? cause.getMessage() : "";
+               LOG.info("Discarding checkpoint {} of job {}.", checkpointId, 
job, cause);
 
-               LOG.info("Discarding checkpoint {} of job {} because: {}", 
checkpointId, job, reason);
+               if (cause != null) {
+                       pendingCheckpoint.abortError(cause);
+               } else {
+                       pendingCheckpoint.abortDeclined();
+               }
 
-               pendingCheckpoint.abortDeclined();
                rememberRecentCheckpointId(checkpointId);
 
                // we don't have to schedule another "dissolving" checkpoint 
any more because the
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1b51ac4..1bc6b0e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -433,25 +434,23 @@ public class PendingCheckpoint {
                }
        }
 
+
        public void abortDeclined() {
-               try {
-                       Exception cause = new Exception("Checkpoint was 
declined (tasks not ready)");
-                       onCompletionPromise.completeExceptionally(cause);
-                       reportFailedCheckpoint(cause);
-               } finally {
-                       dispose(true);
-               }
+               abortWithCause(new Exception("Checkpoint was declined (tasks 
not ready)"));
        }
 
        /**
         * Aborts the pending checkpoint due to an error.
         * @param cause The error's exception.
         */
-       public void abortError(Throwable cause) {
+       public void abortError(@Nonnull Throwable cause) {
+               abortWithCause(new Exception("Checkpoint failed: " + 
cause.getMessage(), cause));
+       }
+
+       private void abortWithCause(@Nonnull Exception cause) {
                try {
-                       Exception failure = new Exception("Checkpoint failed: " 
+ cause.getMessage(), cause);
-                       onCompletionPromise.completeExceptionally(failure);
-                       reportFailedCheckpoint(failure);
+                       onCompletionPromise.completeExceptionally(cause);
+                       reportFailedCheckpoint(cause);
                } finally {
                        dispose(true);
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index aba8bda..918fa50 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -26,8 +26,13 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class RpcCheckpointResponder implements CheckpointResponder {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(RpcCheckpointResponder.class);
+
        private final CheckpointCoordinatorGateway checkpointCoordinatorGateway;
 
        public RpcCheckpointResponder(CheckpointCoordinatorGateway 
checkpointCoordinatorGateway) {
@@ -57,6 +62,7 @@ public class RpcCheckpointResponder implements 
CheckpointResponder {
                        long checkpointId, 
                        Throwable cause) {
 
+               LOG.info("Declining checkpoint {} of job {}.", checkpointId, 
jobID, cause);
                checkpointCoordinatorGateway.declineCheckpoint(jobID, 
executionAttemptID, checkpointId, cause);
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 97c1b06..5f5e4a2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -412,8 +412,10 @@ public abstract class AbstractStreamOperator<OUT>
                                snapshotException.addSuppressed(e);
                        }
 
-                       throw new Exception("Could not complete snapshot " + 
checkpointId + " for operator " +
-                               getOperatorName() + '.', snapshotException);
+                       String snapshotFailMessage = "Could not complete 
snapshot " + checkpointId + " for operator " +
+                               getOperatorName() + ".";
+
+                       throw new Exception(snapshotFailMessage, 
snapshotException);
                }
 
                return snapshotInProgress;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 7eebde8..b83507e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -142,6 +142,7 @@ public abstract class AbstractOperatorRestoreTestBase 
extends TestLogger {
                        } catch (Exception e) {
                                String exceptionString = 
ExceptionUtils.stringifyException(e);
                                if 
(!(exceptionString.matches("(.*\n)*.*savepoint for the job .* failed(.*\n)*") 
// legacy
+                                               || 
exceptionString.matches("(.*\n)*.*was not running(.*\n)*")
                                                || 
exceptionString.matches("(.*\n)*.*Not all required tasks are currently 
running(.*\n)*") // new
                                                || 
exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not 
ready\\)(.*\n)*"))) { // new
                                        throw e;

Reply via email to