[
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264689#comment-16264689
]
ASF GitHub Bot commented on FLINK-7406:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4509#discussion_r152856498
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
---
@@ -306,51 +306,88 @@ public void testProducerFailedException() throws
Exception {
}
/**
- * Tests to verify that the input channel requests floating buffers
from buffer pool
- * in order to maintain backlog + initialCredit buffers available once
receiving the
- * sender's backlog, and registers as listener if no floating buffers
available.
+ * Tests to verify that the input channel requests floating buffers
from buffer pool for
+ * maintaining (backlog + initialCredit) available buffers once
receiving the sender's backlog.
+ *
+ * <p>Verifies the logic of recycling floating buffer back into the
input channel and the logic
+ * of returning extra floating buffer into the buffer pool during
recycling exclusive buffer.
*/
@Test
- public void testRequestFloatingBufferOnSenderBacklog() throws Exception
{
+ public void testRequestAndReturnFloatingBuffer() throws Exception {
// Setup
- final NetworkBufferPool networkBufferPool = new
NetworkBufferPool(12, 32, MemoryType.HEAP);
+ final NetworkBufferPool networkBufferPool = new
NetworkBufferPool(14, 32, MemoryType.HEAP);
+ final int numExclusiveBuffers = 2;
+ final int numFloatingBuffers = 12;
+
final SingleInputGate inputGate = createSingleInputGate();
final RemoteInputChannel inputChannel =
createRemoteInputChannel(inputGate);
+
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(),
inputChannel);
try {
- final int numFloatingBuffers = 10;
final BufferPool bufferPool =
spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
inputGate.setBufferPool(bufferPool);
-
- // Assign exclusive segments to the channel
- final int numExclusiveBuffers = 2;
-
inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(),
inputChannel);
inputGate.assignExclusiveSegments(networkBufferPool,
numExclusiveBuffers);
- assertEquals("There should be " + numExclusiveBuffers +
" buffers available in the channel",
- numExclusiveBuffers,
inputChannel.getNumberOfAvailableBuffers());
+ // Prepare the exclusive and floating buffers to verify
recycle logic later
+ Buffer exclusiveBuffer = inputChannel.requestBuffer();
+ assertNotNull(exclusiveBuffer);
+ Buffer floatingBuffer1 = bufferPool.requestBuffer();
+ assertNotNull(floatingBuffer1);
+ Buffer floatingBuffer2 = bufferPool.requestBuffer();
+ assertNotNull(floatingBuffer2);
- // Receive the producer's backlog
+ // Receive the producer's backlog less than the number
of available floating buffers
inputChannel.onSenderBacklog(8);
- // Request the number of floating buffers by the
formula of backlog + initialCredit - availableBuffers
- verify(bufferPool, times(8)).requestBuffer();
+ // Request the floating buffers to maintain (backlog +
initialCredit) available buffers
+ verify(bufferPool, times(11)).requestBuffer();
verify(bufferPool,
times(0)).addBufferListener(inputChannel);
- assertEquals("There should be 10 buffers available in
the channel",
- 10, inputChannel.getNumberOfAvailableBuffers());
+ assertEquals("There should be 10 buffers available in
the channel", 10, inputChannel.getNumberOfAvailableBuffers());
+ assertEquals("There should be 10 buffers required in
the channel", 10, inputChannel.getNumberOfRequiredBuffers());
- inputChannel.onSenderBacklog(11);
+ // Increase the backlog to exceed the number of
available floating buffers
+ inputChannel.onSenderBacklog(10);
- // Need extra three floating buffers, but only two
buffers available in buffer pool, register as listener as a result
- verify(bufferPool, times(11)).requestBuffer();
+ // The channel does not get enough floating buffer and
register as buffer listener
+ verify(bufferPool, times(13)).requestBuffer();
verify(bufferPool,
times(1)).addBufferListener(inputChannel);
- assertEquals("There should be 12 buffers available in
the channel",
- 12, inputChannel.getNumberOfAvailableBuffers());
+ assertEquals("There should be 11 buffers available in
the channel", 11, inputChannel.getNumberOfAvailableBuffers());
+ assertEquals("There should be 12 buffers required in
the channel", 12, inputChannel.getNumberOfRequiredBuffers());
+ assertEquals("There should be 0 buffer available in
local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
- inputChannel.onSenderBacklog(12);
+ // Continue increasing the backlog
+ inputChannel.onSenderBacklog(11);
- // Already in the status of waiting for buffers and
will not request any more
- verify(bufferPool, times(11)).requestBuffer();
+ // The channel is already in the status of waiting for
buffers and will not request any more
+ verify(bufferPool, times(13)).requestBuffer();
verify(bufferPool,
times(1)).addBufferListener(inputChannel);
+ assertEquals("There should be 11 buffers available in
the channel", 11, inputChannel.getNumberOfAvailableBuffers());
+ assertEquals("There should be 13 buffers required in
the channel", 13, inputChannel.getNumberOfRequiredBuffers());
+ assertEquals("There should be 0 buffer available in
local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
+
+ // Recycle the floating buffer and assign it to the
buffer listener
+ floatingBuffer1.recycle();
+
+ // The channel is still waiting for one more floating
buffer
+ assertEquals("There should be 12 buffers available in
the channel", 12, inputChannel.getNumberOfAvailableBuffers());
+ assertEquals("There should be 13 buffers required in
the channel", 13, inputChannel.getNumberOfRequiredBuffers());
+ assertEquals("There should be 0 buffer available in
local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
+
+ // Recycle one more floating buffer again
+ floatingBuffer2.recycle();
+
+ // The channel already gets all the required buffers
+ assertEquals("There should be 13 buffers available in
the channel", 13, inputChannel.getNumberOfAvailableBuffers());
+ assertEquals("There should be 13 buffers required in
the channel", 13, inputChannel.getNumberOfRequiredBuffers());
+ assertEquals("There should be 0 buffer available in
local pool", 0, bufferPool.getNumberOfAvailableMemorySegments());
+
+ // Decrease the backlog and recycle one exclusive buffer
--- End diff --
please make this two separate tests, i.e. check the invariants after
decreasing the backlog size (nothing should change here yet) and once again
after recycling an exclusive buffer (possibly integrated with the tests I
requested above)
> Implement Netty receiver incoming pipeline for credit-based
> -----------------------------------------------------------
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
> Issue Type: Sub-task
> Components: Network
> Reporter: zhijiang
> Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}}
> for holding the message. If not got, the message is staged temporarily and
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from
> {{BufferPool}}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)