This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.5 by this push: new 79bd161 [hotfix] CheckpointDeclineException should lead to pendingCheckpoint.abortDecline() 79bd161 is described below commit 79bd1611be2646da0b6805a74b360aa1d8e379fb Author: Stefan Richter <s.rich...@data-artisans.com> AuthorDate: Tue Nov 13 09:31:45 2018 +0100 [hotfix] CheckpointDeclineException should lead to pendingCheckpoint.abortDecline() We also avoid logging exceptions that are cause by instances of CheckpointDeclineException --- flink-end-to-end-tests/test-scripts/common.sh | 1 + .../apache/flink/runtime/checkpoint/CheckpointCoordinator.java | 7 ++++--- .../flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java | 8 +------- .../flink/streaming/api/operators/AbstractStreamOperator.java | 1 + 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index e8a4363..018e12d 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -254,6 +254,7 @@ function check_logs_for_exceptions { | grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException" \ | grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration" \ | grep -v "java.lang.Exception: Artificial failure" \ + | grep -v "org.apache.flink.runtime.checkpoint.decline" \ | grep -ic "exception") if [[ ${exception_count} -gt 0 ]]; then echo "Found exception in log files:" diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 51adeae..a3d59cc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.ExecutionState; @@ -1249,10 +1250,10 @@ public class CheckpointCoordinator { LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause); - if (cause != null) { - pendingCheckpoint.abortError(cause); - } else { + if (cause == null || cause instanceof CheckpointDeclineException) { pendingCheckpoint.abortDeclined(); + } else { + pendingCheckpoint.abortError(cause); } rememberRecentCheckpointId(checkpointId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java index 918fa50..c8f7357 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java @@ -26,13 +26,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class RpcCheckpointResponder implements CheckpointResponder { - private static final Logger LOG = LoggerFactory.getLogger(RpcCheckpointResponder.class); - private final CheckpointCoordinatorGateway checkpointCoordinatorGateway; public RpcCheckpointResponder(CheckpointCoordinatorGateway checkpointCoordinatorGateway) { @@ -59,10 +54,9 @@ public class RpcCheckpointResponder implements CheckpointResponder { public void declineCheckpoint( JobID jobID, ExecutionAttemptID executionAttemptID, - long checkpointId, + long checkpointId, Throwable cause) { - LOG.info("Declining checkpoint {} of job {}.", checkpointId, jobID, cause); checkpointCoordinatorGateway.declineCheckpoint(jobID, executionAttemptID, checkpointId, cause); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 19564d0..38ac9f3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -409,6 +409,7 @@ public abstract class AbstractStreamOperator<OUT> String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " + getOperatorName() + "."; + LOG.info(snapshotFailMessage, snapshotException); throw new Exception(snapshotFailMessage, snapshotException); }