pnowojski commented on a change in pull request #15313:
URL: https://github.com/apache/flink/pull/15313#discussion_r598709748



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -126,69 +202,38 @@ public void processBarrier(CheckpointBarrier barrier, 
InputChannelInfo channelIn
 
         if (currentCheckpointId > barrierId
                 || (currentCheckpointId == barrierId && 
!isCheckpointPending())) {
-            controller.obsoleteBarrierReceived(channelInfo, barrier);

Review comment:
       What has happened to that call? It had an important purpose of 
resuming/unblocking network channel if this obsoleted barrier was an aligned 
one.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -126,69 +202,38 @@ public void processBarrier(CheckpointBarrier barrier, 
InputChannelInfo channelIn
 
         if (currentCheckpointId > barrierId
                 || (currentCheckpointId == barrierId && 
!isCheckpointPending())) {
-            controller.obsoleteBarrierReceived(channelInfo, barrier);
             return;
         }
 
-        checkSubsumedCheckpoint(barrier);
+        checkNewCheckpoint(barrier);
 
-        if (numBarriersReceived == 0) {
+        if (numBarriersReceived++ == 0) {

Review comment:
       nit: moving of this increment also ideally should be in another commit, 
just for the sake of making this large refactoring commit as small as possible.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -365,4 +409,88 @@ public String toString() {
                 "%s: current checkpoint: %d, current barriers: %d, open 
channels: %d",
                 taskName, currentCheckpointId, numBarriersReceived, 
numOpenChannels);
     }
+
+    class ContextImpl implements BarrierHandleState.Context {
+        private final Map<InputChannelInfo, Integer> 
sequenceNumberInAnnouncedChannels =
+                new HashMap<>();
+
+        private final Set<InputChannelInfo> blockedChannels = new HashSet<>();

Review comment:
       Maybe those state fields should be moved to `BarrierHandleState`, while 
`ContextImpl` would become a stateless class - just a proxy for action calls?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
##########
@@ -126,69 +202,38 @@ public void processBarrier(CheckpointBarrier barrier, 
InputChannelInfo channelIn
 
         if (currentCheckpointId > barrierId
                 || (currentCheckpointId == barrierId && 
!isCheckpointPending())) {
-            controller.obsoleteBarrierReceived(channelInfo, barrier);
             return;
         }
 
-        checkSubsumedCheckpoint(barrier);
+        checkNewCheckpoint(barrier);
 
-        if (numBarriersReceived == 0) {
+        if (numBarriersReceived++ == 0) {
             if (getNumOpenChannels() == 1) {
                 markAlignmentStartAndEnd(barrier.getTimestamp());
             } else {
                 markAlignmentStart(barrier.getTimestamp());
             }
             allBarriersReceivedFuture = new CompletableFuture<>();
-
-            if (!handleBarrier(barrier, channelInfo, 
controller::preProcessFirstBarrier)) {
-                return;
-            }
-        }
-
-        if (!handleBarrier(barrier, channelInfo, controller::barrierReceived)) 
{
-            return;
         }
 
-        if (currentCheckpointId == barrierId) {

Review comment:
       What has happened with this check? `if (currentCheckpointId == 
barrierId) {`? I'm not sure if it  was important or not, but I just noticed it 
has disappeared.
   
   Was it a no-op? If so it would be probably a better idea to have it removed 
in a separate small hotfix/refactor commit? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to