This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 002a584ed6f55508ab7266a27433a1b72b59ddee
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Nov 10 14:15:41 2020 +0100

    [FLINK-19681][checkpointing] Use converted barrier after disabling alignment
    
    Otherwise, further components (e.g. SubtaskCheckpointCoordinator) can
    get an AC barrier for the UC checkpoint.
---
 .../org/apache/flink/runtime/io/network/api/CheckpointBarrier.java    | 4 ++++
 .../org/apache/flink/streaming/runtime/io/AlternatingController.java  | 2 +-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 2ff1632..8058f7b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -117,4 +117,8 @@ public class CheckpointBarrier extends RuntimeEvent {
        public boolean isCheckpoint() {
                return !checkpointOptions.getCheckpointType().isSavepoint();
        }
+
+       public CheckpointBarrier asUnaligned() {
+               return checkpointOptions.isUnalignedCheckpoint() ? this : new 
CheckpointBarrier(getId(), getTimestamp(), 
getCheckpointOptions().toTimeouted());
+       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index 90b79c4..06e6bdb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -88,7 +88,7 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
 
                if (maybeTimedOut.isPresent()) {
                        if (activeController == alignedController) {
-                               switchToUnaligned(channelInfo, barrier);
+                               switchToUnaligned(channelInfo, 
maybeTimedOut.get());
                                return maybeTimedOut;
                        }
                        else {

Reply via email to