This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d5043c9335347a512f11173dd815dfe08a244973 Author: Roman Khachatryan <[email protected]> AuthorDate: Wed Dec 2 14:58:24 2020 +0100 [FLINK-19681][checkpointing] Don't timeout checkpoint on last barrier --- .../runtime/io/AlternatingController.java | 23 +--------------------- 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java index e3f816c..0a45f97 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java @@ -145,28 +145,7 @@ public class AlternatingController implements CheckpointBarrierBehaviourControll @Override public Optional<CheckpointBarrier> postProcessLastBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException { - Optional<CheckpointBarrier> maybeTimeOut = asTimedOut(barrier); - if (maybeTimeOut.isPresent() && activeController == alignedController) { - switchToUnaligned(channelInfo, maybeTimeOut.get()); - checkState(activeController == unalignedController); - checkState(!activeController.postProcessLastBarrier(channelInfo, maybeTimeOut.orElse(barrier)).isPresent()); - return maybeTimeOut; - } - - barrier = maybeTimeOut.orElse(barrier); - if (barrier.getCheckpointOptions().isUnalignedCheckpoint()) { - checkState(activeController == unalignedController); - checkState(!activeController.postProcessLastBarrier(channelInfo, maybeTimeOut.orElse(barrier)).isPresent()); - return Optional.empty(); - } - else { - checkState(activeController == alignedController); - Optional<CheckpointBarrier> triggerResult = activeController.postProcessLastBarrier( - channelInfo, - barrier); - checkState(triggerResult.isPresent()); - return triggerResult; - } + return activeController.postProcessLastBarrier(channelInfo, barrier); } @Override
