[ 
https://issues.apache.org/jira/browse/FLINK-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16182849#comment-16182849
 ] 

ASF GitHub Bot commented on FLINK-7699:
---------------------------------------

Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4735#discussion_r141398906
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
    @@ -87,6 +88,12 @@
        /** The number of available buffers that have not been announced to the 
producer yet. */
        private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
     
    +   /** The number of unsent buffers in the producer's sub partition. */
    +   private final AtomicInteger senderBacklog = new AtomicInteger(0);
    +
    +   /** The tag indicates whether this channel is waiting for additional 
floating buffers from the buffer pool. */
    +   private final AtomicBoolean isWaitingFloatingBuffers = new 
AtomicBoolean(false);
    --- End diff --
    
    These two fields are currently used in `notifyBufferAvailable()` logic, so 
we have to define them in this PR.
    In next PR #4509 , `isWaitingForFloatingBuffers` field will be set  true by 
two conditions:
    1. The number of current available buffers is less than sender backlog
    2. There are no available floating buffers in `BufferProvider`
    And this field also used to avoid register listener in `BufferProvider` 
multi times.
    
    It may confuse you to only see this PR change, sorry for that.


> Define the BufferListener interface to replace EventListener in BufferProvider
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-7699
>                 URL: https://issues.apache.org/jira/browse/FLINK-7699
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Core
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.4.0
>
>
> Currently the {{EventListener}} is used in {{BufferProvider}} to be notified 
> of buffer available or destroyed pool. 
> To be semantic clearly, we define a new {{BufferListener}} interface which 
> can opt for a one-time only notification or to be notified repeatedly. And we 
> can also notify the pool destroyed via explicitly method 
> {{notifyBufferDestroyed}}.
> The {{RemoteInputChannel}} will implement the {{BufferListener}} interface to 
> wait for floating buffers from {{BufferProvider}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to