Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4552#discussion_r161565896
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
---
@@ -71,4 +78,142 @@ public void testProducerFailedException() throws
Exception {
NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse)
msg;
assertTrue(err.cause instanceof CancelTaskException);
}
+
+ /**
+ * Tests {@link
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+ * verifying the reader would be enqueued in the pipeline if the next
sending buffer is event, even
+ * though it has no available credits.
+ */
+ @Test
+ public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
+ // setup
+ final ResultSubpartitionView view =
mock(ResultSubpartitionView.class);
+ when(view.nextBufferIsEvent()).thenReturn(true);
+
+ final ResultPartitionID partitionId = new ResultPartitionID();
+ final ResultPartitionProvider partitionProvider =
mock(ResultPartitionProvider.class);
+ when(partitionProvider.createSubpartitionView(
+ eq(partitionId),
+ eq(0),
+
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+ final InputChannelID receiverId = new InputChannelID();
+ final PartitionRequestQueue queue = new PartitionRequestQueue();
+ final SequenceNumberingViewReader reader = new
SequenceNumberingViewReader(receiverId, 0, queue);
+ final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+ reader.requestSubpartitionView(partitionProvider, partitionId,
0);
+
+ // block the channel so that we see an intermediate state in
the test
+ ByteBuf channelBlockingBuffer = blockChannel(channel);
+ assertNull(channel.readOutbound());
+
+ // Notify an available event buffer to trigger enqueue the
reader
+ reader.notifyBuffersAvailable(1);
+
+ channel.runPendingTasks();
+
+ // The reader is enqueued in the pipeline because the next
buffer is an event, even though no credits are available
+ assertEquals(1, queue.getAvailableReaders().size());
+ assertEquals(0, reader.getNumCreditsAvailable());
+
+ // Flush the buffer to make the channel writable again and see
the final results
+ channel.flush();
+ assertSame(channelBlockingBuffer, channel.readOutbound());
+
+ assertEquals(0, queue.getAvailableReaders().size());
+ assertEquals(0, reader.getNumCreditsAvailable());
+ }
+
+ /**
+ * Tests {@link
PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
+ * verifying the reader would be enqueued in the pipeline iff it has
both available credits and buffers.
+ */
+ @Test
+ public void testEnqueueReaderByNotifyingBufferAndCredit() throws
Exception {
+ // setup
+ final ResultSubpartitionView view =
mock(ResultSubpartitionView.class);
+ when(view.nextBufferIsEvent()).thenReturn(false);
+ when(view.getNextBuffer()).thenReturn(new
BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false));
+
+ final ResultPartitionID partitionId = new ResultPartitionID();
+ final ResultPartitionProvider partitionProvider =
mock(ResultPartitionProvider.class);
+ when(partitionProvider.createSubpartitionView(
+ eq(partitionId),
+ eq(0),
+
any(BufferAvailabilityListener.class))).thenReturn(view);
+
+ final InputChannelID receiverId = new InputChannelID();
+ final PartitionRequestQueue queue = new PartitionRequestQueue();
+ final SequenceNumberingViewReader reader = new
SequenceNumberingViewReader(receiverId, 0, queue);
+ final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+ reader.requestSubpartitionView(partitionProvider, partitionId,
0);
+ queue.notifyReaderCreated(reader);
+
+ // block the channel so that we see an intermediate state in
the test
+ ByteBuf channelBlockingBuffer = blockChannel(channel);
+ assertNull(channel.readOutbound());
+
+ // Notify available buffers to trigger enqueue the reader
+ final int notifyNumBuffers = 5;
+ for (int i = 0; i < notifyNumBuffers; i++) {
+ reader.notifyBuffersAvailable(1);
+ }
+
+ channel.runPendingTasks();
+
+ // the reader is not enqueued in the pipeline because no
credits are available
+ // -> it should still have the same number of pending buffers
+ assertEquals(0, queue.getAvailableReaders().size());
+ assertEquals(notifyNumBuffers, reader.getNumBuffersAvailable());
+ assertFalse(reader.isRegisteredAsAvailable());
+ assertEquals(0, reader.getNumCreditsAvailable());
+
+ // Notify available credits to trigger enqueue the reader again
+ final int notifyNumCredits = 3;
+ for (int i = 1; i <= notifyNumCredits; i++) {
+ queue.addCredit(receiverId, 1);
+
+ // the reader is enqueued in the pipeline because it
has both available buffers and credits
+ // since the channel is blocked though, we will not
process anything and only enqueue the
+ // reader once
+ assertTrue(reader.isRegisteredAsAvailable());
+ assertEquals(1, queue.getAvailableReaders().size());
--- End diff --
same here: `assertThat(queue.getAvailableReaders(), contains(reader));`
---