Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4509#discussion_r141291264
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
---
@@ -122,6 +113,28 @@ public void testReceiveEmptyBuffer() throws Exception {
}
/**
+ * Verifies that {@link RemoteInputChannel#onSenderBacklog(int)} is
called when a
+ * {@link BufferResponse} is received.
+ */
+ @Test
+ public void testReceiveBacklog() throws Exception {
+ final RemoteInputChannel inputChannel =
mock(RemoteInputChannel.class);
+ when(inputChannel.getInputChannelId()).thenReturn(new
InputChannelID());
+
when(inputChannel.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
+
+ final int backlog = 10;
+ final BufferResponse receivedBuffer = createBufferResponse(
+ TestBufferFactory.createBuffer(), 0,
inputChannel.getInputChannelId(), backlog);
+
+ final CreditBasedClientHandler client = new
CreditBasedClientHandler();
+ client.addInputChannel(inputChannel);
+
+ client.channelRead(mock(ChannelHandlerContext.class),
receivedBuffer);
+
+ verify(inputChannel, times(1)).onSenderBacklog(backlog);
--- End diff --
Please also add this check to the `testReceiveEmptyBuffer()` method above.
---