Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6272#discussion_r201752234
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
    @@ -360,32 +360,44 @@ public boolean notifyBufferAvailable(Buffer buffer) {
                        return false;
                }
     
    -           boolean needMoreBuffers = false;
    -           synchronized (bufferQueue) {
    -                   checkState(isWaitingForFloatingBuffers, "This channel 
should be waiting for floating buffers.");
    +           boolean recycleBuffer = true;
    +           try {
    +                   boolean needMoreBuffers = false;
    +                   synchronized (bufferQueue) {
    +                           checkState(isWaitingForFloatingBuffers,
    +                                   "This channel should be waiting for 
floating buffers.");
    +
    +                           // Important: double check the isReleased state 
inside synchronized block, so there is no
    +                           // race condition when notifyBufferAvailable 
and releaseAllResources running in parallel.
    +                           if (isReleased.get() ||
    +                                   bufferQueue.getAvailableBufferSize() >= 
numRequiredBuffers) {
    +                                   isWaitingForFloatingBuffers = false;
    +                                   buffer.recycleBuffer();
    +                                   return false;
    +                           }
     
    -                   // Important: double check the isReleased state inside 
synchronized block, so there is no
    -                   // race condition when notifyBufferAvailable and 
releaseAllResources running in parallel.
    -                   if (isReleased.get() || 
bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
    -                           isWaitingForFloatingBuffers = false;
    -                           buffer.recycleBuffer();
    -                           return false;
    -                   }
    +                           recycleBuffer = false;
    +                           bufferQueue.addFloatingBuffer(buffer);
     
    -                   bufferQueue.addFloatingBuffer(buffer);
    +                           if (bufferQueue.getAvailableBufferSize() == 
numRequiredBuffers) {
    +                                   isWaitingForFloatingBuffers = false;
    +                           } else {
    +                                   needMoreBuffers = true;
    +                           }
     
    -                   if (bufferQueue.getAvailableBufferSize() == 
numRequiredBuffers) {
    -                           isWaitingForFloatingBuffers = false;
    -                   } else {
    -                           needMoreBuffers =  true;
    +                           if (unannouncedCredit.getAndAdd(1) == 0) {
    +                                   notifyCreditAvailable();
    +                           }
                        }
    -           }
     
    -           if (unannouncedCredit.getAndAdd(1) == 0) {
    -                   notifyCreditAvailable();
    +                   return needMoreBuffers;
    +           } catch (Throwable t) {
    +                   if (recycleBuffer) {
    +                           buffer.recycleBuffer();
    +                   }
    +                   setError(t);
    +                   return false;
    --- End diff --
    
    hmmm, `return false`? Can not it deadlock? Shouldn't we return correct 
value here? Maybe this `try catch` should be more tight? However I don't fully 
understand what this try/catch is protecting us from.


---

Reply via email to