AHeise commented on a change in pull request #14828:
URL: https://github.com/apache/flink/pull/14828#discussion_r567812836
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
##########
@@ -263,11 +264,21 @@ private void handleExecutionException(Exception e) {
// Otherwise this followup exception could race the
original exception in
// failing the task.
try {
+ Optional<CheckpointException>
underlyingCheckpointException =
+ ExceptionUtils.findThrowable(
+ checkpointException,
CheckpointException.class);
+ // If this failure is already a CheckpointException,
do not overwrite the
+ // original CheckpointFailureReason
+ CheckpointFailureReason reportedFailureReason =
Review comment:
`underlyingCheckpointException.map(...).orElse?`
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnableTest.java
##########
@@ -89,6 +78,47 @@ private void testAsyncCheckpointException(Supplier<Boolean>
isTaskRunning) {
}
}
+ @Test
+ public void testDeclineAsyncCheckpoint() {
+ CheckpointFailureReason originalReason =
+
CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM;
+
+ final Map<OperatorID, OperatorSnapshotFutures> snapshotsInProgress =
new HashMap<>();
+ snapshotsInProgress.put(
+ new OperatorID(),
+ new OperatorSnapshotFutures(
+ DoneFuture.of(SnapshotResult.empty()),
+ DoneFuture.of(SnapshotResult.empty()),
+ DoneFuture.of(SnapshotResult.empty()),
+ DoneFuture.of(SnapshotResult.empty()),
+ ExceptionallyDoneFuture.of(new
CheckpointException(originalReason)),
+ DoneFuture.of(SnapshotResult.empty())));
+
+ final TestEnvironment environment = new TestEnvironment();
+ final AsyncCheckpointRunnable runnable =
+ createAsyncRunnable(snapshotsInProgress, environment, true);
+ runnable.run();
+
+
Assert.assertSame((environment.getCause()).getCheckpointFailureReason(),
originalReason);
Review comment:
nit: Remove extra braces? (also above from where you copied)
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -257,8 +257,8 @@ private void abortInternal(long cancelledId,
CheckpointException exception) thro
Math.max(lastCancelledOrCompletedCheckpointId, cancelledId);
numBarriersReceived = 0;
controller.abortPendingCheckpoint(cancelledId, exception);
- allBarriersReceivedFuture.completeExceptionally(exception);
notifyAbort(cancelledId, exception);
+ allBarriersReceivedFuture.completeExceptionally(exception);
Review comment:
I'd think that we should backport all fixes in this commit, so I'd be
okay with leaving it as is.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]