This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d5043c9335347a512f11173dd815dfe08a244973
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Dec 2 14:58:24 2020 +0100

    [FLINK-19681][checkpointing] Don't timeout checkpoint on last barrier
---
 .../runtime/io/AlternatingController.java          | 23 +---------------------
 1 file changed, 1 insertion(+), 22 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
index e3f816c..0a45f97 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
@@ -145,28 +145,7 @@ public class AlternatingController implements 
CheckpointBarrierBehaviourControll
 
        @Override
        public Optional<CheckpointBarrier> 
postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) 
throws IOException, CheckpointException {
-               Optional<CheckpointBarrier> maybeTimeOut = asTimedOut(barrier);
-               if (maybeTimeOut.isPresent() && activeController == 
alignedController) {
-                       switchToUnaligned(channelInfo, maybeTimeOut.get());
-                       checkState(activeController == unalignedController);
-                       
checkState(!activeController.postProcessLastBarrier(channelInfo, 
maybeTimeOut.orElse(barrier)).isPresent());
-                       return maybeTimeOut;
-               }
-
-               barrier = maybeTimeOut.orElse(barrier);
-               if (barrier.getCheckpointOptions().isUnalignedCheckpoint()) {
-                       checkState(activeController == unalignedController);
-                       
checkState(!activeController.postProcessLastBarrier(channelInfo, 
maybeTimeOut.orElse(barrier)).isPresent());
-                       return Optional.empty();
-               }
-               else {
-                       checkState(activeController == alignedController);
-                       Optional<CheckpointBarrier> triggerResult = 
activeController.postProcessLastBarrier(
-                               channelInfo,
-                               barrier);
-                       checkState(triggerResult.isPresent());
-                       return triggerResult;
-               }
+               return activeController.postProcessLastBarrier(channelInfo, 
barrier);
        }
 
        @Override

Reply via email to