Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167883063 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IO BufferAndAvailability next = null; try { - if (channel.isWritable()) { - while (true) { - SequenceNumberingViewReader reader = nonEmptyReader.poll(); - - // No queue with available data. We allow this here, because - // of the write callbacks that are executed after each write. - if (reader == null) { - return; + while (true) { + NetworkSequenceViewReader reader = poolAvailableReader(); + + // No queue with available data. We allow this here, because + // of the write callbacks that are executed after each write. + if (reader == null) { + return; + } + + next = reader.getNextBuffer(); + if (next == null) { + if (!reader.isReleased()) { + continue; } + markAsReleased(reader.getReceiverId()); + + Throwable cause = reader.getFailureCause(); + if (cause != null) { + ErrorResponse msg = new ErrorResponse( + new ProducerFailedException(cause), + reader.getReceiverId()); - next = reader.getNextBuffer(); - - if (next == null) { - if (reader.isReleased()) { - markAsReleased(reader.getReceiverId()); - Throwable cause = reader.getFailureCause(); - - if (cause != null) { - ErrorResponse msg = new ErrorResponse( - new ProducerFailedException(cause), - reader.getReceiverId()); - - ctx.writeAndFlush(msg); - } - } else { - IllegalStateException err = new IllegalStateException( - "Bug in Netty consumer logic: reader queue got notified by partition " + - "about available data, but none was available."); - handleException(ctx.channel(), err); - return; - } - } else { - // this channel was now removed from the non-empty reader queue - // we re-add it in case it has more data, because in that case no - // "non-empty" notification will come for that reader from the queue. - if (next.moreAvailable()) { - nonEmptyReader.add(reader); - } - - BufferResponse msg = new BufferResponse( - next.buffer(), - reader.getSequenceNumber(), - reader.getReceiverId(), - 0); - - if (isEndOfPartitionEvent(next.buffer())) { - reader.notifySubpartitionConsumed(); - reader.releaseAllResources(); - - markAsReleased(reader.getReceiverId()); - } - - // Write and flush and wait until this is done before - // trying to continue with the next buffer. - channel.writeAndFlush(msg).addListener(writeListener); - - return; + ctx.writeAndFlush(msg); } + } else { + // This channel was now removed from the available reader queue. + // We re-add it into the queue if it is still available + if (next.moreAvailable()) { --- End diff -- This looks like the most common case, and I wonder why we cannot just peek the queue and only remove `reader` in the other cases?
---