Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5423#discussion_r167883063
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
---
@@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final
Channel channel) throws IO
BufferAndAvailability next = null;
try {
- if (channel.isWritable()) {
- while (true) {
- SequenceNumberingViewReader reader =
nonEmptyReader.poll();
-
- // No queue with available data. We
allow this here, because
- // of the write callbacks that are
executed after each write.
- if (reader == null) {
- return;
+ while (true) {
+ NetworkSequenceViewReader reader =
poolAvailableReader();
+
+ // No queue with available data. We allow this
here, because
+ // of the write callbacks that are executed
after each write.
+ if (reader == null) {
+ return;
+ }
+
+ next = reader.getNextBuffer();
+ if (next == null) {
+ if (!reader.isReleased()) {
+ continue;
}
+ markAsReleased(reader.getReceiverId());
+
+ Throwable cause =
reader.getFailureCause();
+ if (cause != null) {
+ ErrorResponse msg = new
ErrorResponse(
+ new
ProducerFailedException(cause),
+ reader.getReceiverId());
- next = reader.getNextBuffer();
-
- if (next == null) {
- if (reader.isReleased()) {
-
markAsReleased(reader.getReceiverId());
- Throwable cause =
reader.getFailureCause();
-
- if (cause != null) {
- ErrorResponse
msg = new ErrorResponse(
- new
ProducerFailedException(cause),
-
reader.getReceiverId());
-
-
ctx.writeAndFlush(msg);
- }
- } else {
- IllegalStateException
err = new IllegalStateException(
- "Bug in Netty
consumer logic: reader queue got notified by partition " +
- "about
available data, but none was available.");
-
handleException(ctx.channel(), err);
- return;
- }
- } else {
- // this channel was now removed
from the non-empty reader queue
- // we re-add it in case it has
more data, because in that case no
- // "non-empty" notification
will come for that reader from the queue.
- if (next.moreAvailable()) {
-
nonEmptyReader.add(reader);
- }
-
- BufferResponse msg = new
BufferResponse(
- next.buffer(),
-
reader.getSequenceNumber(),
- reader.getReceiverId(),
- 0);
-
- if
(isEndOfPartitionEvent(next.buffer())) {
-
reader.notifySubpartitionConsumed();
-
reader.releaseAllResources();
-
-
markAsReleased(reader.getReceiverId());
- }
-
- // Write and flush and wait
until this is done before
- // trying to continue with the
next buffer.
-
channel.writeAndFlush(msg).addListener(writeListener);
-
- return;
+ ctx.writeAndFlush(msg);
}
+ } else {
+ // This channel was now removed from
the available reader queue.
+ // We re-add it into the queue if it is
still available
+ if (next.moreAvailable()) {
--- End diff --
This looks like the most common case, and I wonder why we cannot just peek
the queue and only remove `reader` in the other cases?---
