pnowojski commented on a change in pull request #14057:
URL: https://github.com/apache/flink/pull/14057#discussion_r544377114
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -451,11 +451,20 @@ public void onBuffer(Buffer buffer, int sequenceNumber,
int backlog) throws IOEx
}
else {
receivedBuffers.add(sequenceBuffer);
-
channelStatePersister.maybePersist(buffer);
if (dataType.requiresAnnouncement()) {
firstPriorityEvent =
addPriorityBuffer(announce(sequenceBuffer));
}
}
+ channelStatePersister
+ .checkForBarrier(sequenceBuffer.buffer)
+ .filter(id -> id > lastBarrierId)
+ .ifPresent(id -> {
+ // checkpoint was not yet
started by task thread,
+ // so remember the numbers of
buffers to spill for the time when it will be started
+ lastBarrierId = id;
+ lastBarrierSequenceNumber =
sequenceBuffer.sequenceNumber;
+ });
+ channelStatePersister.maybePersist(buffer);
Review comment:
And what about test coverage for those changes?
Side question, shouldn't those two fixes be separate commits?
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
##########
@@ -68,6 +68,22 @@
*/
public class AlternatingControllerTest {
+ /**
+ * Upon subsuming (or canceling) a checkpoint, channels should be
notified regardless of whether UC controller is
+ * currently being used or not. Otherwise, channels may capture
in-flight buffers from an older checkpoint.
Review comment:
> Otherwise, channels may capture in-flight buffers from an older
checkpoint
Is this test actually checking for that? I do not see any buffer that would
belong to an older checkpoint?
(I think I still do not understand this fix)
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
##########
@@ -84,6 +84,24 @@ public void testChannelResetOnNewBarrier() throws Exception {
assertFalse(stateWriter.getAddedInput().isEmpty());
}
+ /**
+ * If a checkpoint announcement was processed and then UC-barrier
arrives (from the upstream)
+ * then it should be processed by the UC controller.
+ */
+ @Test
+ public void testSwitchToUnalignedByUpstream() throws Exception {
+ SingleInputGate inputGate = new
SingleInputGateBuilder().setNumberOfChannels(2).build();
+ inputGate.setInputChannels(new TestInputChannel(inputGate, 0),
new TestInputChannel(inputGate, 1));
+ ValidatingCheckpointHandler target = new
ValidatingCheckpointHandler();
+ SingleCheckpointBarrierHandler barrierHandler =
barrierHandler(inputGate, target);
+ CheckpointedInputGate gate = buildGate(target, 2);
+
+ CheckpointBarrier aligned = new CheckpointBarrier(1,
System.currentTimeMillis(), alignedWithTimeout(getDefault(),
Integer.MAX_VALUE));
+
+ send(toBuffer(new EventAnnouncement(aligned, 0), true), 0,
gate); // process announcement but not the barrier
+ send(toBuffer(aligned.asUnaligned(), true), 1, gate); //
pretend it came from upstream before the first (AC) barrier was picked up
+ }
Review comment:
Aren't we missing some assertion?
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/AlternatingControllerTest.java
##########
@@ -84,6 +84,24 @@ public void testChannelResetOnNewBarrier() throws Exception {
assertFalse(stateWriter.getAddedInput().isEmpty());
}
+ /**
+ * If a checkpoint announcement was processed and then UC-barrier
arrives (from the upstream)
+ * then it should be processed by the UC controller.
Review comment:
```
* If a checkpoint announcement was processed and then UC-barrier
arrives (from the upstream)
* then it should be processed by the UC controller.
```
->
```
* If a checkpoint announcement was processed from one channel and then
UC-barrier arrives
* on another channel, this UC barrier should be processed by the UC
controller.
```
?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java
##########
@@ -114,6 +114,7 @@ public void barrierAnnouncement(
lastSeenBarrier = barrier.getId();
firstBarrierArrivalTime = getArrivalTime(barrier);
}
+ activeController = chooseController(barrier);
Review comment:
I still don't get this fix.
> But there is a preProcessFirstBarrier before the switch in barrierReceived.
Yes, but my intention was that in that case, `preProcessFirstBarrier` would
be called on the AlignedController. Next on the first barrier, we would switch
to unaligned controller, which is doing this:
```
// alignedController might has already processed some barriers,
so "migrate"/forward those calls to unalignedController.
unalignedController.preProcessFirstBarrier(channelInfo,
barrier);
for (InputChannelInfo blockedChannel : blockedChannels) {
unalignedController.barrierReceived(blockedChannel,
barrier);
}
```
so `preProcessFirstBarrier` would be eventually called on the unaligned
controller.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]