wsry commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r666626886



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -312,6 +323,16 @@ BufferAndBacklog pollBuffer() {
                     decreaseBuffersInBacklogUnsafe(bufferConsumer.isBuffer());
                 }
 
+                // if we have an empty finished buffer and the exclusive 
credit is 0, we just return
+                // the empty buffer so that the downstream task can release 
the allocated credit for
+                // this empty buffer, this happens in two main scenarios 
currently:
+                // 1. all data of a buffer builder has been read and after 
that the buffer builder
+                // is finished
+                // 2. in approximate recovery mode, a partial record takes a 
whole buffer builder
+                if (buffersPerChannel == 0 && bufferConsumer.isFinished()) {
+                    break;
+                }
+

Review comment:
       > One thing to add (unless we are not doing it already) would be to use 
this backlog information, to maybe release floating buffers if backlog dropped 
to 0?
   
   Currently, we are not doing that. Actually, I think it is a little 
complicated to do so. Because we need to keep consistency between the sender 
side available credit and the receiver side floating buffers. If we just 
release the floating buffers at the receiver side, if the sender side available 
credit is not reset, then there is may data sent out without buffers at 
receiver side to receive them. If we also reset the available credit at the 
sender side when the backlog is 0, there is a possibility that some AddCredit 
messages are on the way and we are not resetting this part. Maybe one way is to 
not sending any data out after sending a buffer with 0 backlog at sender side, 
then the receivers clear all floating credits and send a reset message to the 
senders. Then the senders reset all available credits. This process is similar 
to the channel blocking and resumption. I think this is a little complicated 
and can incur extra overhead.
   
   What do you think? Or is there any simple way?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to