Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6272#discussion_r201242662
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
    @@ -360,32 +360,45 @@ public boolean notifyBufferAvailable(Buffer buffer) {
                        return false;
                }
     
    -           boolean needMoreBuffers = false;
    -           synchronized (bufferQueue) {
    -                   checkState(isWaitingForFloatingBuffers, "This channel 
should be waiting for floating buffers.");
    +           boolean recycleBuffer = true;
    +           try {
    +                   boolean needMoreBuffers = false;
    +                   synchronized (bufferQueue) {
    +                           checkState(isWaitingForFloatingBuffers,
    +                                   "This channel should be waiting for 
floating buffers.");
    +
    +                           // Important: double check the isReleased state 
inside synchronized block, so there is no
    +                           // race condition when notifyBufferAvailable 
and releaseAllResources running in parallel.
    +                           if (isReleased.get() ||
    +                                   bufferQueue.getAvailableBufferSize() >= 
numRequiredBuffers) {
    +                                   isWaitingForFloatingBuffers = false;
    +                                   buffer.recycleBuffer();
    +                                   return false;
    +                           }
     
    -                   // Important: double check the isReleased state inside 
synchronized block, so there is no
    -                   // race condition when notifyBufferAvailable and 
releaseAllResources running in parallel.
    -                   if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
    -                           isWaitingForFloatingBuffers = false;
    -                           buffer.recycleBuffer();
    -                           return false;
    -                   }
    +                           // note: this call may fail, for better 
cleanup, increase the counter first
    +                           if (unannouncedCredit.getAndAdd(1) == 0) {
    +                                   notifyCreditAvailable();
    --- End diff --
    
    From failure point, `notifyCreditAvailable` should be called before 
`bufferQueue.addFloatingBuffer`. But from another point,  it seems more strict 
in logic to confirm buffer ready before announcing credit, otherwise the 
channel may receive new data before queuing this floating buffer, although it 
can hardly happen based on current implementation.
    
    Another concern is that `notifyCreditAvailable` itself is thread safe. But 
considering handling failure easily, we place it under `synchronized` part. 
Currently the process of `notifyCreditAvailable` is very lightweight, so the 
cost can be ignored.
    
    Maybe the above two concerns are unnecessary, and I can accept the current 
modifications.


---

Reply via email to