[
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16196771#comment-16196771
]
ASF GitHub Bot commented on FLINK-7406:
---------------------------------------
Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4509#discussion_r143230905
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
---
@@ -306,10 +311,27 @@ public void recycle(MemorySegment segment) {
ExceptionUtils.rethrow(t);
}
}
+
+ // Recycle the extra floating buffers in order not to
stack up 2*initialCredit
+ // buffers once current backlog is 0
+ if (senderBacklog.get() == 0 && availableBuffers.size()
>= initialCredit) {
+ final int size = availableBuffers.size();
+ for (int i = 0; i < size; i++) {
+ final Buffer buffer =
availableBuffers.poll();
--- End diff --
I wonder if it would make things simpler if we had two separate lists of
buffers: one for exclusive buffers and one for floating buffers. Then you would
at least not have to iterate through the list. When retrieving buffers, you'd
always take the floating buffers first (so we don't get into this situation too
often). If that makes it simpler, we could also extract this logic into a
separate `AvailableBuffersQueue` class or so. Regarding testability, this may
also make sense.
> Implement Netty receiver incoming pipeline for credit-based
> -----------------------------------------------------------
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
> Issue Type: Sub-task
> Components: Network
> Reporter: zhijiang
> Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}}
> for holding the message. If not got, the message is staged temporarily and
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from
> {{BufferPool}}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)