[ https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365619#comment-16365619 ]
ASF GitHub Bot commented on FLINK-8581: --------------------------------------- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168490355 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java --- @@ -151,64 +213,55 @@ private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IO BufferAndAvailability next = null; try { - if (channel.isWritable()) { - while (true) { - SequenceNumberingViewReader reader = nonEmptyReader.poll(); - - // No queue with available data. We allow this here, because - // of the write callbacks that are executed after each write. - if (reader == null) { - return; + while (true) { + NetworkSequenceViewReader reader = poolAvailableReader(); + + // No queue with available data. We allow this here, because + // of the write callbacks that are executed after each write. + if (reader == null) { + return; + } + + next = reader.getNextBuffer(); + if (next == null) { + if (!reader.isReleased()) { + continue; } + markAsReleased(reader.getReceiverId()); + + Throwable cause = reader.getFailureCause(); + if (cause != null) { + ErrorResponse msg = new ErrorResponse( + new ProducerFailedException(cause), + reader.getReceiverId()); - next = reader.getNextBuffer(); - - if (next == null) { - if (reader.isReleased()) { - markAsReleased(reader.getReceiverId()); - Throwable cause = reader.getFailureCause(); - - if (cause != null) { - ErrorResponse msg = new ErrorResponse( - new ProducerFailedException(cause), - reader.getReceiverId()); - - ctx.writeAndFlush(msg); - } - } else { - IllegalStateException err = new IllegalStateException( - "Bug in Netty consumer logic: reader queue got notified by partition " + - "about available data, but none was available."); - handleException(ctx.channel(), err); - return; - } - } else { - // this channel was now removed from the non-empty reader queue - // we re-add it in case it has more data, because in that case no - // "non-empty" notification will come for that reader from the queue. - if (next.moreAvailable()) { - nonEmptyReader.add(reader); - } - - BufferResponse msg = new BufferResponse( - next.buffer(), - reader.getSequenceNumber(), - reader.getReceiverId(), - 0); - - if (isEndOfPartitionEvent(next.buffer())) { - reader.notifySubpartitionConsumed(); - reader.releaseAllResources(); - - markAsReleased(reader.getReceiverId()); - } - - // Write and flush and wait until this is done before - // trying to continue with the next buffer. - channel.writeAndFlush(msg).addListener(writeListener); - - return; + ctx.writeAndFlush(msg); } + } else { + // This channel was now removed from the available reader queue. + // We re-add it into the queue if it is still available + if (next.moreAvailable()) { --- End diff -- This is not the most common case - except of super low latencies cases, network is much faster then our capabilities to produce data. Secondly, there are three branches that we need to cover here. With as it is no, we `poll ` reader once, and only re-enqueue it once (in this case that you commented). With `peek` we would have to `pop` it in two places. > Improve performance for low latency network > ------------------------------------------- > > Key: FLINK-8581 > URL: https://issues.apache.org/jira/browse/FLINK-8581 > Project: Flink > Issue Type: Improvement > Components: Network > Affects Versions: 1.5.0 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)