Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5423#discussion_r168490355
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 ---
    @@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final 
Channel channel) throws IO
     
                BufferAndAvailability next = null;
                try {
    -                   if (channel.isWritable()) {
    -                           while (true) {
    -                                   SequenceNumberingViewReader reader = 
nonEmptyReader.poll();
    -
    -                                   // No queue with available data. We 
allow this here, because
    -                                   // of the write callbacks that are 
executed after each write.
    -                                   if (reader == null) {
    -                                           return;
    +                   while (true) {
    +                           NetworkSequenceViewReader reader = 
poolAvailableReader();
    +
    +                           // No queue with available data. We allow this 
here, because
    +                           // of the write callbacks that are executed 
after each write.
    +                           if (reader == null) {
    +                                   return;
    +                           }
    +
    +                           next = reader.getNextBuffer();
    +                           if (next == null) {
    +                                   if (!reader.isReleased()) {
    +                                           continue;
                                        }
    +                                   markAsReleased(reader.getReceiverId());
    +
    +                                   Throwable cause = 
reader.getFailureCause();
    +                                   if (cause != null) {
    +                                           ErrorResponse msg = new 
ErrorResponse(
    +                                                   new 
ProducerFailedException(cause),
    +                                                   reader.getReceiverId());
     
    -                                   next = reader.getNextBuffer();
    -
    -                                   if (next == null) {
    -                                           if (reader.isReleased()) {
    -                                                   
markAsReleased(reader.getReceiverId());
    -                                                   Throwable cause = 
reader.getFailureCause();
    -
    -                                                   if (cause != null) {
    -                                                           ErrorResponse 
msg = new ErrorResponse(
    -                                                                   new 
ProducerFailedException(cause),
    -                                                                   
reader.getReceiverId());
    -
    -                                                           
ctx.writeAndFlush(msg);
    -                                                   }
    -                                           } else {
    -                                                   IllegalStateException 
err = new IllegalStateException(
    -                                                           "Bug in Netty 
consumer logic: reader queue got notified by partition " +
    -                                                                   "about 
available data, but none was available.");
    -                                                   
handleException(ctx.channel(), err);
    -                                                   return;
    -                                           }
    -                                   } else {
    -                                           // this channel was now removed 
from the non-empty reader queue
    -                                           // we re-add it in case it has 
more data, because in that case no
    -                                           // "non-empty" notification 
will come for that reader from the queue.
    -                                           if (next.moreAvailable()) {
    -                                                   
nonEmptyReader.add(reader);
    -                                           }
    -
    -                                           BufferResponse msg = new 
BufferResponse(
    -                                                   next.buffer(),
    -                                                   
reader.getSequenceNumber(),
    -                                                   reader.getReceiverId(),
    -                                                   0);
    -
    -                                           if 
(isEndOfPartitionEvent(next.buffer())) {
    -                                                   
reader.notifySubpartitionConsumed();
    -                                                   
reader.releaseAllResources();
    -
    -                                                   
markAsReleased(reader.getReceiverId());
    -                                           }
    -
    -                                           // Write and flush and wait 
until this is done before
    -                                           // trying to continue with the 
next buffer.
    -                                           
channel.writeAndFlush(msg).addListener(writeListener);
    -
    -                                           return;
    +                                           ctx.writeAndFlush(msg);
                                        }
    +                           } else {
    +                                   // This channel was now removed from 
the available reader queue.
    +                                   // We re-add it into the queue if it is 
still available
    +                                   if (next.moreAvailable()) {
    --- End diff --
    
    This is not the most common case - except of super low latencies cases, 
network is much faster then our capabilities to produce data.
    
    Secondly, there are three branches that we need to cover here. With as it 
is no, we `poll ` reader once, and only re-enqueue it once (in this case that 
you commented). With `peek` we would have to `pop` it in two places.


---

Reply via email to