This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 002a584ed6f55508ab7266a27433a1b72b59ddee Author: Roman Khachatryan <[email protected]> AuthorDate: Tue Nov 10 14:15:41 2020 +0100 [FLINK-19681][checkpointing] Use converted barrier after disabling alignment Otherwise, further components (e.g. SubtaskCheckpointCoordinator) can get an AC barrier for the UC checkpoint. --- .../org/apache/flink/runtime/io/network/api/CheckpointBarrier.java | 4 ++++ .../org/apache/flink/streaming/runtime/io/AlternatingController.java | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java index 2ff1632..8058f7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java @@ -117,4 +117,8 @@ public class CheckpointBarrier extends RuntimeEvent { public boolean isCheckpoint() { return !checkpointOptions.getCheckpointType().isSavepoint(); } + + public CheckpointBarrier asUnaligned() { + return checkpointOptions.isUnalignedCheckpoint() ? this : new CheckpointBarrier(getId(), getTimestamp(), getCheckpointOptions().toTimeouted()); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java index 90b79c4..06e6bdb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java @@ -88,7 +88,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll if (maybeTimedOut.isPresent()) { if (activeController == alignedController) { - switchToUnaligned(channelInfo, barrier); + switchToUnaligned(channelInfo, maybeTimedOut.get()); return maybeTimedOut; } else {
