pnowojski commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r544377114



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -451,11 +451,20 @@ public void onBuffer(Buffer buffer, int sequenceNumber, 
int backlog) throws IOEx
                                }
                                else {
                                        receivedBuffers.add(sequenceBuffer);
-                                       
channelStatePersister.maybePersist(buffer);
                                        if (dataType.requiresAnnouncement()) {
                                                firstPriorityEvent = 
addPriorityBuffer(announce(sequenceBuffer));
                                        }
                                }
+                               channelStatePersister
+                                       .checkForBarrier(sequenceBuffer.buffer)
+                                       .filter(id -> id > lastBarrierId)
+                                       .ifPresent(id -> {
+                                               // checkpoint was not yet 
started by task thread,
+                                               // so remember the numbers of 
buffers to spill for the time when it will be started
+                                               lastBarrierId = id;
+                                               lastBarrierSequenceNumber = 
sequenceBuffer.sequenceNumber;
+                                       });
+                               channelStatePersister.maybePersist(buffer);

Review comment:
       And what about test coverage for  those changes?
   
   Side question, shouldn't those two fixes be separate commits?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
##########
@@ -68,6 +68,22 @@
  */
 public class AlternatingControllerTest {
 
+       /**
+        * Upon subsuming (or canceling) a checkpoint, channels should be 
notified regardless of whether UC controller is
+        * currently being used or not. Otherwise, channels may capture 
in-flight buffers from an older checkpoint.

Review comment:
       > Otherwise, channels may capture in-flight buffers from an older 
checkpoint
   
   Is this test actually checking for that? I do not see any buffer that would 
belong to an older checkpoint?
   
   (I think I still do not understand this fix)

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
##########
@@ -84,6 +84,24 @@ public void testChannelResetOnNewBarrier() throws Exception {
                assertFalse(stateWriter.getAddedInput().isEmpty());
        }
 
+       /**
+        * If a checkpoint announcement was processed and then UC-barrier 
arrives (from the upstream)
+        * then it should be processed by the UC controller.
+        */
+       @Test
+       public void testSwitchToUnalignedByUpstream() throws Exception {
+               SingleInputGate inputGate = new 
SingleInputGateBuilder().setNumberOfChannels(2).build();
+               inputGate.setInputChannels(new TestInputChannel(inputGate, 0), 
new TestInputChannel(inputGate, 1));
+               ValidatingCheckpointHandler target = new 
ValidatingCheckpointHandler();
+               SingleCheckpointBarrierHandler barrierHandler = 
barrierHandler(inputGate, target);
+               CheckpointedInputGate gate = buildGate(target, 2);
+
+               CheckpointBarrier aligned = new CheckpointBarrier(1, 
System.currentTimeMillis(), alignedWithTimeout(getDefault(), 
Integer.MAX_VALUE));
+
+               send(toBuffer(new EventAnnouncement(aligned, 0), true), 0, 
gate); // process announcement but not the barrier
+               send(toBuffer(aligned.asUnaligned(), true), 1, gate); // 
pretend it came from upstream before the first (AC) barrier was picked up
+       }

Review comment:
       Aren't we missing some assertion?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
##########
@@ -84,6 +84,24 @@ public void testChannelResetOnNewBarrier() throws Exception {
                assertFalse(stateWriter.getAddedInput().isEmpty());
        }
 
+       /**
+        * If a checkpoint announcement was processed and then UC-barrier 
arrives (from the upstream)
+        * then it should be processed by the UC controller.

Review comment:
       ```
         * If a checkpoint announcement was processed and then UC-barrier 
arrives (from the upstream)
         * then it should be processed by the UC controller.
   ```
   ->
   ```
         * If a checkpoint announcement was processed from one channel and then 
UC-barrier arrives 
         * on another channel, this UC barrier should be processed by the UC 
controller.
   ```
   ?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##########
@@ -114,6 +114,7 @@ public void barrierAnnouncement(
                        lastSeenBarrier = barrier.getId();
                        firstBarrierArrivalTime = getArrivalTime(barrier);
                }
+               activeController = chooseController(barrier);

Review comment:
       I still don't get this fix.
   > But there is a preProcessFirstBarrier before the switch in barrierReceived.
   
   Yes, but my intention was that in that case, `preProcessFirstBarrier` would 
be called on the AlignedController. Next on the first barrier, we would switch 
to unaligned controller, which is doing this:
   ```
                // alignedController might has already processed some barriers, 
so "migrate"/forward those calls to unalignedController.
                unalignedController.preProcessFirstBarrier(channelInfo, 
barrier);
                for (InputChannelInfo blockedChannel : blockedChannels) {
                        unalignedController.barrierReceived(blockedChannel, 
barrier);
                }
   ```
   so `preProcessFirstBarrier` would be eventually called on the unaligned 
controller.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to