Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5423#discussion_r167857194
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
    @@ -55,69 +51,56 @@
        /** Flag indicating whether the subpartition has been released. */
        private volatile boolean isReleased;
     
    -   /** The number of non-event buffers currently in this subpartition. */
    -   @GuardedBy("buffers")
    -   private int buffersInBacklog;
    -
        // 
------------------------------------------------------------------------
     
        PipelinedSubpartition(int index, ResultPartition parent) {
                super(index, parent);
        }
     
        @Override
    -   public boolean add(Buffer buffer) throws IOException {
    -           checkNotNull(buffer);
    -
    -           // view reference accessible outside the lock, but assigned 
inside the locked scope
    -           final PipelinedSubpartitionView reader;
    +   public boolean add(BufferConsumer bufferConsumer) throws IOException {
    +           return add(bufferConsumer, false);
    +   }
     
    +   @Override
    +   public void flush() {
                synchronized (buffers) {
    -                   if (isFinished || isReleased) {
    -                           buffer.recycleBuffer();
    -                           return false;
    +                   if (readView != null) {
    +                           readView.notifyDataAvailable();
                        }
    -
    -                   // Add the buffer and update the stats
    -                   buffers.add(buffer);
    -                   reader = readView;
    -                   updateStatistics(buffer);
    -                   increaseBuffersInBacklog(buffer);
    -           }
    -
    -           // Notify the listener outside of the synchronized block
    -           if (reader != null) {
    -                   reader.notifyBuffersAvailable(1);
                }
    -
    -           return true;
        }
     
        @Override
        public void finish() throws IOException {
    -           final Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
    +           
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
    +           LOG.debug("Finished {}.", this);
    +   }
     
    -           // view reference accessible outside the lock, but assigned 
inside the locked scope
    -           final PipelinedSubpartitionView reader;
    +   private boolean add(BufferConsumer bufferConsumer, boolean finish) 
throws IOException {
    --- End diff --
    
    I think you can remove the `throws IOException`, and after that also on the 
public `add(...)`.


---

Reply via email to