This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new 79bd161  [hotfix] CheckpointDeclineException should lead to 
pendingCheckpoint.abortDecline()
79bd161 is described below

commit 79bd1611be2646da0b6805a74b360aa1d8e379fb
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Tue Nov 13 09:31:45 2018 +0100

    [hotfix] CheckpointDeclineException should lead to 
pendingCheckpoint.abortDecline()
    
    We also avoid logging exceptions that are cause by instances of 
CheckpointDeclineException
---
 flink-end-to-end-tests/test-scripts/common.sh                     | 1 +
 .../apache/flink/runtime/checkpoint/CheckpointCoordinator.java    | 7 ++++---
 .../flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java    | 8 +-------
 .../flink/streaming/api/operators/AbstractStreamOperator.java     | 1 +
 4 files changed, 7 insertions(+), 10 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index e8a4363..018e12d 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -254,6 +254,7 @@ function check_logs_for_exceptions {
       | grep -v "java.lang.NoClassDefFoundError: 
org/apache/hadoop/yarn/exceptions/YarnException" \
       | grep -v "java.lang.NoClassDefFoundError: 
org/apache/hadoop/conf/Configuration" \
       | grep -v "java.lang.Exception: Artificial failure" \
+      | grep -v "org.apache.flink.runtime.checkpoint.decline" \
       | grep -ic "exception")
   if [[ ${exception_count} -gt 0 ]]; then
     echo "Found exception in log files:"
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 51adeae..a3d59cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -1249,10 +1250,10 @@ public class CheckpointCoordinator {
 
                LOG.info("Discarding checkpoint {} of job {}.", checkpointId, 
job, cause);
 
-               if (cause != null) {
-                       pendingCheckpoint.abortError(cause);
-               } else {
+               if (cause == null || cause instanceof 
CheckpointDeclineException) {
                        pendingCheckpoint.abortDeclined();
+               } else {
+                       pendingCheckpoint.abortError(cause);
                }
 
                rememberRecentCheckpointId(checkpointId);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index 918fa50..c8f7357 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -26,13 +26,8 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 public class RpcCheckpointResponder implements CheckpointResponder {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(RpcCheckpointResponder.class);
-
        private final CheckpointCoordinatorGateway checkpointCoordinatorGateway;
 
        public RpcCheckpointResponder(CheckpointCoordinatorGateway 
checkpointCoordinatorGateway) {
@@ -59,10 +54,9 @@ public class RpcCheckpointResponder implements 
CheckpointResponder {
        public void declineCheckpoint(
                        JobID jobID,
                        ExecutionAttemptID executionAttemptID,
-                       long checkpointId, 
+                       long checkpointId,
                        Throwable cause) {
 
-               LOG.info("Declining checkpoint {} of job {}.", checkpointId, 
jobID, cause);
                checkpointCoordinatorGateway.declineCheckpoint(jobID, 
executionAttemptID, checkpointId, cause);
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 19564d0..38ac9f3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -409,6 +409,7 @@ public abstract class AbstractStreamOperator<OUT>
                        String snapshotFailMessage = "Could not complete 
snapshot " + checkpointId + " for operator " +
                                getOperatorName() + ".";
 
+                       LOG.info(snapshotFailMessage, snapshotException);
                        throw new Exception(snapshotFailMessage, 
snapshotException);
                }
 

Reply via email to