Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4509#discussion_r141896488
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ---
    @@ -412,23 +468,35 @@ public void onBuffer(Buffer buffer, int 
sequenceNumber) {
                                        }
                                }
                        }
    +
    +                   if (success && backlog > 0) {
    +                           onSenderBacklog(backlog);
    +                   }
    +
                } finally {
                        if (!success) {
                                buffer.recycle();
                        }
                }
        }
     
    -   public void onEmptyBuffer(int sequenceNumber) {
    +   public void onEmptyBuffer(int sequenceNumber, int backlog) {
    +           boolean success = false;
    +
                synchronized (receivedBuffers) {
                        if (!isReleased.get()) {
                                if (expectedSequenceNumber == sequenceNumber) {
                                        expectedSequenceNumber++;
    +                                   success = true;
                                } else {
                                        onError(new 
BufferReorderingException(expectedSequenceNumber, sequenceNumber));
                                }
                        }
                }
    +
    +           if (success && backlog > 0) {
    --- End diff --
    
    same here


---

Reply via email to