Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5423#discussion_r167604523
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
    @@ -55,69 +51,56 @@
        /** Flag indicating whether the subpartition has been released. */
        private volatile boolean isReleased;
     
    -   /** The number of non-event buffers currently in this subpartition. */
    -   @GuardedBy("buffers")
    -   private int buffersInBacklog;
    -
        // 
------------------------------------------------------------------------
     
        PipelinedSubpartition(int index, ResultPartition parent) {
                super(index, parent);
        }
     
        @Override
    -   public boolean add(Buffer buffer) throws IOException {
    -           checkNotNull(buffer);
    -
    -           // view reference accessible outside the lock, but assigned 
inside the locked scope
    -           final PipelinedSubpartitionView reader;
    +   public boolean add(BufferConsumer bufferConsumer) throws IOException {
    +           return add(bufferConsumer, false);
    +   }
     
    +   @Override
    +   public void flush() {
                synchronized (buffers) {
    -                   if (isFinished || isReleased) {
    -                           buffer.recycleBuffer();
    -                           return false;
    +                   if (readView != null) {
    +                           readView.notifyDataAvailable();
                        }
    -
    -                   // Add the buffer and update the stats
    -                   buffers.add(buffer);
    -                   reader = readView;
    -                   updateStatistics(buffer);
    -                   increaseBuffersInBacklog(buffer);
    -           }
    -
    -           // Notify the listener outside of the synchronized block
    -           if (reader != null) {
    -                   reader.notifyBuffersAvailable(1);
                }
    -
    -           return true;
        }
     
        @Override
        public void finish() throws IOException {
    -           final Buffer buffer = 
EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
    +           
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
    +           LOG.debug("Finished {}.", this);
    +   }
     
    -           // view reference accessible outside the lock, but assigned 
inside the locked scope
    -           final PipelinedSubpartitionView reader;
    +   private boolean add(BufferConsumer bufferConsumer, boolean finish) 
throws IOException {
    +           checkNotNull(bufferConsumer);
     
                synchronized (buffers) {
                        if (isFinished || isReleased) {
    -                           return;
    +                           bufferConsumer.close();
    +                           return false;
                        }
     
    -                   buffers.add(buffer);
    -                   reader = readView;
    -                   updateStatistics(buffer);
    +                   // Add the bufferConsumer and update the stats
    +                   buffers.add(bufferConsumer);
    +                   updateStatistics(bufferConsumer);
    +                   increaseBuffersInBacklog(bufferConsumer);
     
    -                   isFinished = true;
    +                   if (finish) {
    +                           isFinished = true;
    +                           notifyDataAvailable();
    --- End diff --
    
    I noticed that this, introduces a subtle change: unlike before, the 
notification to the listeners now happens under the lock of `buffers`. Just 
want to double-check that this will not have negative side-effects for 
performance? Did this fix any correctness problems?


---

Reply via email to