[
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16196775#comment-16196775
]
ASF GitHub Bot commented on FLINK-7406:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4509#discussion_r143422899
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
---
@@ -374,6 +376,64 @@ public void testReleaseExclusiveBuffers() throws
Exception {
verify(inputGate,
times(1)).returnExclusiveSegments(anyListOf(MemorySegment.class));
}
+ /**
+ * Tests {@link BufferPool#requestBuffer()}, verifying the remote input
channel tries to request
+ * floating buffers once receiving the producer's backlog.
+ */
+ @Test
+ public void testRequestFloatingBuffersOnBuffer() throws Exception {
+ // Setup
+ final BufferPool bufferPool = mock(BufferPool.class);
+
when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
+
+ final SingleInputGate inputGate = mock(SingleInputGate.class);
+ when(inputGate.getBufferPool()).thenReturn(bufferPool);
+
+ final RemoteInputChannel inputChannel =
createRemoteInputChannel(inputGate);
+
+ // Receive the producer's backlog
+ inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10);
+ // Need to request 10 floating buffers from buffer pool
+ verify(bufferPool, times(10)).requestBuffer();
+
+ inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 8);
+ // No need to request extra floating buffers from pool because
+ // there are already 10 available buffers in queue now
+ verify(bufferPool, times(10)).requestBuffer();
+
+ inputChannel.onBuffer(TestBufferFactory.createBuffer(), 2, 11);
+ // Need to request another floating buffer from pool
+ verify(bufferPool, times(11)).requestBuffer();
+ }
+
+ /**
+ * Tests {@link BufferPool#requestBuffer()}, verifying the remote input
channel tries to request
+ * floating buffers once receiving the producer's backlog. And it
requests from pool only once
+ * and registers as listener if there are currently no available
buffers in the pool.
+ */
+ @Test
+ public void testWaitForFloatingBuffersOnBuffer() throws Exception {
+ // Setup
+ final BufferPool bufferPool = mock(BufferPool.class);
+ when(bufferPool.requestBuffer()).thenReturn(null);
+
when(bufferPool.addBufferListener(any(BufferListener.class))).thenReturn(true);
+
+ final SingleInputGate inputGate = mock(SingleInputGate.class);
+ when(inputGate.getBufferPool()).thenReturn(bufferPool);
+ when(inputGate.getBufferProvider()).thenReturn(bufferPool);
+
+ final RemoteInputChannel inputChannel =
createRemoteInputChannel(inputGate);
+
+ // Receive the producer's backlog
+ inputChannel.onBuffer(TestBufferFactory.createBuffer(), 0, 10);
+ // Request from pool only once if there are no available
floating buffers
+ verify(bufferPool, times(1)).requestBuffer();
+
+ inputChannel.onBuffer(TestBufferFactory.createBuffer(), 1, 10);
+ // Already registers as listener to wait for notifications and
will not request any more
+ verify(bufferPool, times(1)).requestBuffer();
+ }
+
--- End diff --
* I guess, you can add the "fair buffer distribution" test in this PR, to
reduce the rebasing efforts and not switching back and forth between PRs too
much
> Implement Netty receiver incoming pipeline for credit-based
> -----------------------------------------------------------
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
> Issue Type: Sub-task
> Components: Network
> Reporter: zhijiang
> Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}}
> for holding the message. If not got, the message is staged temporarily and
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from
> {{BufferPool}}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)