pnowojski commented on a change in pull request #11948:
URL: https://github.com/apache/flink/pull/11948#discussion_r424678536
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -109,10 +109,17 @@ private static CheckpointBarrierHandler
createCheckpointBarrierHandler(
switch (config.getCheckpointMode()) {
case EXACTLY_ONCE:
if (config.isUnalignedCheckpointsEnabled()) {
- return new CheckpointBarrierUnaligner(
-
numberOfInputChannelsPerGate.toArray(),
- channelStateWriter,
- taskName,
+ return new
AlternatingCheckpointBarrierHandler(
+ new CheckpointBarrierAligner(
+ taskName,
+ channelIndexToInputGate,
+
inputGateToChannelIndexOffset,
+ toNotifyOnCheckpoint),
+ new CheckpointBarrierUnaligner(
+
numberOfInputChannelsPerGate.toArray(),
+ channelStateWriter,
+ taskName,
+ toNotifyOnCheckpoint),
Review comment:
There are two methods in `CheckpointedInputGate` which are implemented
in not very object oriented design (I guess they should be re-implemented) that
would stop working with this change (I hope some tests will fail because of
that):
```
public void spillInflightBuffers(
long checkpointId,
int channelIndex,
ChannelStateWriter channelStateWriter) throws
IOException {
if (((CheckpointBarrierUnaligner)
barrierHandler).hasInflightData(checkpointId, channelIndex)) {
inputGate.getChannel(channelIndex).spillInflightBuffers(checkpointId,
channelStateWriter);
}
}
public CompletableFuture<Void> getAllBarriersReceivedFuture(long
checkpointId) {
return ((CheckpointBarrierUnaligner)
barrierHandler).getAllBarriersReceivedFuture(checkpointId);
}
```
I guess `hasInflightData()` and `getAllBarriersReceivedFuture()` should be
pulled to `CheckpointBarrierHandler` interface (separate commit?)?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
##########
@@ -456,6 +463,23 @@ public void testUnblockReleasedChannel() throws Exception {
localChannel.resumeConsumption();
}
+ @Test
+ public void testNoNotifyOnSavepoint() throws IOException {
+ TestBufferReceivedListener listener = new
TestBufferReceivedListener();
+ LocalInputChannel channel = new LocalInputChannel(
+ new SingleInputGateBuilder().build(),
+ 0,
+ new ResultPartitionID(),
+ new ResultPartitionManager(),
+ new TaskEventDispatcher(),
+ new TestCounter(),
+ new TestCounter());
+ CheckpointBarrier barrier = new CheckpointBarrier(123L, 123L,
new CheckpointOptions(SAVEPOINT,
CheckpointStorageLocationReference.getDefault()));
+ channel.notifyPriorityEvent(new
BufferConsumer(toBuffer(barrier).getMemorySegment(),
FreeingBufferRecycler.INSTANCE, getDataType(barrier)));
+ channel.checkError();
+ assertTrue(listener.notifiedOnBarriers.isEmpty());
Review comment:
frankly, it's not that unexpected that it's empty, since `listener` was
never passed anywhere? 😈
(you forgot to register it to `SingleInputGate`?)
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
##########
@@ -456,6 +463,23 @@ public void testUnblockReleasedChannel() throws Exception {
localChannel.resumeConsumption();
}
+ @Test
+ public void testNoNotifyOnSavepoint() throws IOException {
+ TestBufferReceivedListener listener = new
TestBufferReceivedListener();
+ LocalInputChannel channel = new LocalInputChannel(
+ new SingleInputGateBuilder().build(),
+ 0,
+ new ResultPartitionID(),
+ new ResultPartitionManager(),
+ new TaskEventDispatcher(),
+ new TestCounter(),
+ new TestCounter());
Review comment:
nit:
```
InputChannelBuilder
.newBuilder()
.buildLocalChannel(new SingleInputGateBuilder().build())
```
?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]