[
https://issues.apache.org/jira/browse/FLINK-10753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684147#comment-16684147
]
ASF GitHub Bot commented on FLINK-10753:
----------------------------------------
asfgit closed pull request #7064: [FLINK-10753] Improve propagation and logging
of snapshot exceptions
URL: https://github.com/apache/flink/pull/7064
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 57337b6286f..02b6fa4a2bb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1249,11 +1249,14 @@ private void discardCheckpoint(PendingCheckpoint
pendingCheckpoint, @Nullable Th
final long checkpointId = pendingCheckpoint.getCheckpointId();
- final String reason = (cause != null) ? cause.getMessage() : "";
+ LOG.info("Discarding checkpoint {} of job {}.", checkpointId,
job, cause);
- LOG.info("Discarding checkpoint {} of job {} because: {}",
checkpointId, job, reason);
+ if (cause != null) {
+ pendingCheckpoint.abortError(cause);
+ } else {
+ pendingCheckpoint.abortDeclined();
+ }
- pendingCheckpoint.abortDeclined();
rememberRecentCheckpointId(checkpointId);
// we don't have to schedule another "dissolving" checkpoint
any more because the
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1b51ac4bf8d..1bc6b0e4baa 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -34,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -433,25 +434,23 @@ public void abortSubsumed() {
}
}
+
public void abortDeclined() {
- try {
- Exception cause = new Exception("Checkpoint was
declined (tasks not ready)");
- onCompletionPromise.completeExceptionally(cause);
- reportFailedCheckpoint(cause);
- } finally {
- dispose(true);
- }
+ abortWithCause(new Exception("Checkpoint was declined (tasks
not ready)"));
}
/**
* Aborts the pending checkpoint due to an error.
* @param cause The error's exception.
*/
- public void abortError(Throwable cause) {
+ public void abortError(@Nonnull Throwable cause) {
+ abortWithCause(new Exception("Checkpoint failed: " +
cause.getMessage(), cause));
+ }
+
+ private void abortWithCause(@Nonnull Exception cause) {
try {
- Exception failure = new Exception("Checkpoint failed: "
+ cause.getMessage(), cause);
- onCompletionPromise.completeExceptionally(failure);
- reportFailedCheckpoint(failure);
+ onCompletionPromise.completeExceptionally(cause);
+ reportFailedCheckpoint(cause);
} finally {
dispose(true);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index aba8bda1918..918fa50483d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -26,8 +26,13 @@
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class RpcCheckpointResponder implements CheckpointResponder {
+ private static final Logger LOG =
LoggerFactory.getLogger(RpcCheckpointResponder.class);
+
private final CheckpointCoordinatorGateway checkpointCoordinatorGateway;
public RpcCheckpointResponder(CheckpointCoordinatorGateway
checkpointCoordinatorGateway) {
@@ -57,6 +62,7 @@ public void declineCheckpoint(
long checkpointId,
Throwable cause) {
+ LOG.info("Declining checkpoint {} of job {}.", checkpointId,
jobID, cause);
checkpointCoordinatorGateway.declineCheckpoint(jobID,
executionAttemptID, checkpointId, cause);
}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a63a7971679..4967cb9ead9 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -413,8 +413,10 @@ public final OperatorSnapshotFutures snapshotState(long
checkpointId, long times
snapshotException.addSuppressed(e);
}
- throw new Exception("Could not complete snapshot " +
checkpointId + " for operator " +
- getOperatorName() + '.', snapshotException);
+ String snapshotFailMessage = "Could not complete
snapshot " + checkpointId + " for operator " +
+ getOperatorName() + ".";
+
+ throw new Exception(snapshotFailMessage,
snapshotException);
}
return snapshotInProgress;
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index 3db0f62f829..097616feb96 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -142,6 +142,7 @@ private String migrateJob(ClassLoader classLoader,
ClusterClient<?> clusterClien
} catch (Exception e) {
String exceptionString =
ExceptionUtils.stringifyException(e);
if
(!(exceptionString.matches("(.*\n)*.*savepoint for the job .* failed(.*\n)*")
// legacy
+ ||
exceptionString.matches("(.*\n)*.*was not running(.*\n)*")
||
exceptionString.matches("(.*\n)*.*Not all required tasks are currently
running(.*\n)*") // new
||
exceptionString.matches("(.*\n)*.*Checkpoint was declined \\(tasks not
ready\\)(.*\n)*"))) { // new
throw e;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Propagate and log snapshotting exceptions
> -----------------------------------------
>
> Key: FLINK-10753
> URL: https://issues.apache.org/jira/browse/FLINK-10753
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.6.2, 1.7.0
> Reporter: Alexander Fedulov
> Assignee: Stefan Richter
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: Screen Shot 2018-11-01 at 16.27.01.png
>
>
> Upon failure, {{AbstractStreamOperator.snapshotState}} rethrows a new
> exception with the message "{{Could not complete snapshot {} for operator
> {}.}}" and the original exception as the cause.
> While handling the error, {{CheckpointCoordinator.discardCheckpoint}} method
> logs only this propagated message and not the original cause of the
> exception.
> In addition, {{pendingCheckpoint.abortDeclined()}}, called from the
> {{discardCheckpoint}}, reports the failed checkpoint with a misleading
> message "{{Checkpoint was declined (tasks not ready)}}". This message is what
> will be displayed in the UI (see attached).
> Proposition:
> # Log exception at the Task Manager (.snapshotState)
> # Log cause, instead of cause.getMessage() at the JobsManager
> (.dicardCheckpoint)
> # Pass root cause to abortDeclined and propagate a more appropriate message
> to the UI.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)