zhijiangW commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r422847423



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##########
@@ -234,11 +236,16 @@ private void writeAndFlushNextMessageIfPossible(final 
Channel channel) throws IO
                                                registerAvailableReader(reader);
                                        }
 
-                                       BufferResponse msg = new BufferResponse(
-                                               next.buffer(),
-                                               reader.getSequenceNumber(),
-                                               reader.getReceiverId(),
-                                               next.buffersInBacklog());
+                                       Object msg;
+                                       if (next.buffer() != null) {

Review comment:
       The different message path is distinguished in both 
`PartitionRequestQueue` and `CreditBasedSequenceNumberingViewReader` now. We 
can improve it to judge only in one place instead.
   
   1. Introduce `ServerOutboundMessage` class to extend `NettyMessage` and make 
`AddBacklog` and `BufferResponse` both extend `ServerOutboundMessage`.
   2. Introduce `NetworkSequenceViewReader#getNextMessage` instead of existing 
`NetworkSequenceViewReader#getNextBuffer`. And inside 
`CreditBasedSequenceNumberingViewReader` implementation, we can judge the 
condition for distinguish.
   
   ```
   public NettyMessage.ServerOutboundMessage getNextMessage() throws 
IOException {
                if (numCreditsAvailable == 0 && initialCredit == 0 && 
!subpartitionView.isAvailable(numCreditsAvailable)) {
                        return getBacklogMessage();
                } else {
                        return getNextBufferResponse();
                }
        }
   ```
   
   To do so we can also reduce the necessary transformation between 
`BufferAndAvailability` and `BufferResponse`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to