Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5423#discussion_r167883063
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
    @@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final 
Channel channel) throws IO
     
                BufferAndAvailability next = null;
                try {
    -                   if (channel.isWritable()) {
    -                           while (true) {
    -                                   SequenceNumberingViewReader reader = 
nonEmptyReader.poll();
    -
    -                                   // No queue with available data. We 
allow this here, because
    -                                   // of the write callbacks that are 
executed after each write.
    -                                   if (reader == null) {
    -                                           return;
    +                   while (true) {
    +                           NetworkSequenceViewReader reader = 
poolAvailableReader();
    +
    +                           // No queue with available data. We allow this 
here, because
    +                           // of the write callbacks that are executed 
after each write.
    +                           if (reader == null) {
    +                                   return;
    +                           }
    +
    +                           next = reader.getNextBuffer();
    +                           if (next == null) {
    +                                   if (!reader.isReleased()) {
    +                                           continue;
                                        }
    +                                   markAsReleased(reader.getReceiverId());
    +
    +                                   Throwable cause = 
reader.getFailureCause();
    +                                   if (cause != null) {
    +                                           ErrorResponse msg = new 
ErrorResponse(
    +                                                   new 
ProducerFailedException(cause),
    +                                                   reader.getReceiverId());
     
    -                                   next = reader.getNextBuffer();
    -
    -                                   if (next == null) {
    -                                           if (reader.isReleased()) {
    -                                                   
markAsReleased(reader.getReceiverId());
    -                                                   Throwable cause = 
reader.getFailureCause();
    -
    -                                                   if (cause != null) {
    -                                                           ErrorResponse 
msg = new ErrorResponse(
    -                                                                   new 
ProducerFailedException(cause),
    -                                                                   
reader.getReceiverId());
    -
    -                                                           
ctx.writeAndFlush(msg);
    -                                                   }
    -                                           } else {
    -                                                   IllegalStateException 
err = new IllegalStateException(
    -                                                           "Bug in Netty 
consumer logic: reader queue got notified by partition " +
    -                                                                   "about 
available data, but none was available.");
    -                                                   
handleException(ctx.channel(), err);
    -                                                   return;
    -                                           }
    -                                   } else {
    -                                           // this channel was now removed 
from the non-empty reader queue
    -                                           // we re-add it in case it has 
more data, because in that case no
    -                                           // "non-empty" notification 
will come for that reader from the queue.
    -                                           if (next.moreAvailable()) {
    -                                                   
nonEmptyReader.add(reader);
    -                                           }
    -
    -                                           BufferResponse msg = new 
BufferResponse(
    -                                                   next.buffer(),
    -                                                   
reader.getSequenceNumber(),
    -                                                   reader.getReceiverId(),
    -                                                   0);
    -
    -                                           if 
(isEndOfPartitionEvent(next.buffer())) {
    -                                                   
reader.notifySubpartitionConsumed();
    -                                                   
reader.releaseAllResources();
    -
    -                                                   
markAsReleased(reader.getReceiverId());
    -                                           }
    -
    -                                           // Write and flush and wait 
until this is done before
    -                                           // trying to continue with the 
next buffer.
    -                                           
channel.writeAndFlush(msg).addListener(writeListener);
    -
    -                                           return;
    +                                           ctx.writeAndFlush(msg);
                                        }
    +                           } else {
    +                                   // This channel was now removed from 
the available reader queue.
    +                                   // We re-add it into the queue if it is 
still available
    +                                   if (next.moreAvailable()) {
    --- End diff --
    
    This looks like the most common case, and I wonder why we cannot just peek 
the queue and only remove `reader` in the other cases?


---

Reply via email to