Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r141901569
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
    @@ -209,6 +276,95 @@ public String toString() {
        }
     
        // 
------------------------------------------------------------------------
    +   // Credit-based
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Enqueue this input channel in the pipeline for sending unannounced 
credits to producer.
    +    */
    +   void notifyCreditAvailable() {
    +           //TODO in next PR
    +   }
    +
    +   /**
    +    * Exclusive buffer is recycled to this input channel directly and it 
may trigger notify
    +    * credit to producer.
    +    *
    +    * @param segment The exclusive segment of this channel.
    +    */
    +   @Override
    +   public void recycle(MemorySegment segment) {
    +           synchronized (availableBuffers) {
    +                   // Important: the isReleased check should be inside the 
synchronized block.
    +                   // that way the segment can also be returned to global 
pool after added into
    +                   // the available queue during releasing all resources.
    +                   if (isReleased.get()) {
    +                           try {
    +                                   
inputGate.returnExclusiveSegments(Arrays.asList(segment));
    +                                   return;
    +                           } catch (Throwable t) {
    +                                   ExceptionUtils.rethrow(t);
    +                           }
    +                   }
    +                   availableBuffers.add(new Buffer(segment, this));
    +           }
    +
    +           if (unannouncedCredit.getAndAdd(1) == 0) {
    +                   notifyCreditAvailable();
    +           }
    +   }
    +
    +   public int getNumberOfAvailableBuffers() {
    +           synchronized (availableBuffers) {
    +                   return availableBuffers.size();
    +           }
    +   }
    +
    +   /**
    +    * The Buffer pool notifies this channel of an available floating 
buffer. If the channel is released or
    +    * currently does not need extra buffers, the buffer should be recycled 
to the buffer pool. Otherwise,
    +    * the buffer will be added into the <tt>availableBuffers</tt> queue 
and the unannounced credit is
    +    * increased by one.
    +    *
    +    * @param buffer Buffer that becomes available in buffer pool.
    +    * @return True when this channel is waiting for more floating buffers, 
otherwise false.
    +    */
    +   @Override
    +   public boolean notifyBufferAvailable(Buffer buffer) {
    +           checkState(isWaitingForFloatingBuffers.get(), "This channel 
should be waiting for floating buffers.");
    +
    +           synchronized (availableBuffers) {
    +                   // Important: the isReleased check should be inside the 
synchronized block.
    +                   if (isReleased.get() || availableBuffers.size() >= 
senderBacklog.get()) {
    +                           isWaitingForFloatingBuffers.set(false);
    +                           buffer.recycle();
    +
    +                           return false;
    +                   }
    +
    +                   availableBuffers.add(buffer);
    +
    +                   if (unannouncedCredit.getAndAdd(1) == 0) {
    +                           notifyCreditAvailable();
    +                   }
    --- End diff --
    
    can we do this outside the `synchronized` block?


---

Reply via email to