wsry commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r665933067



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##########
@@ -164,6 +168,27 @@ void addCreditOrResumeConsumption(
         }
     }
 
+    /**
+     * Announces remaining backlog to the consumer after the available data 
notification or data
+     * consumption resumption.
+     */
+    private void announceBacklog(NetworkSequenceViewReader reader) {
+        int backlog = reader.getRemainingBacklog();
+        if (backlog > 0) {

Review comment:
       NetworkSequenceViewReader#isAvailable() has already been called by 
PartitionRequestQueue#enqueueAvailableReader:
   ```
   private void enqueueAvailableReader(final NetworkSequenceViewReader reader) 
throws Exception {
   if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
   return;
   }
   ......
   }
   ```
   I think we can change it to:
   ```
       private void enqueueAvailableReader(final NetworkSequenceViewReader 
reader) throws Exception {
           if (reader.isRegisteredAsAvailable()) {
               return;
           }
   
           ResultSubpartitionView.AvailabilityWithBacklog 
availabilityWithBacklog =
                   reader.getAvailabilityAndBacklog();
           if (!availabilityWithBacklog.isAvailable()) {
               int backlog = availabilityWithBacklog.getBacklog();
               if (backlog > 0 && reader.needAnnounceBacklog()) {
                   announceBacklog(reader, backlog);
               }
               return;
           }
   ......
   }
   ```
   What do you think?
   
   > After all even in your version I think there can be a race condition where 
PartitionRequestQueue is notified reader is non empty, you check the 
getRemainingBacklog() and send the BacklogAnnouncement message, while before it 
gets processed by the receiver, some buffer is polled from this reader and 
backlog goes down to 0. And as a result, receiver assigns as a credit to a 
sender that doesn't have any data anymore?
   
   I think there is no such problem. One reason is that 
```getRemainingBacklog()``` is in netty thread and we can guarantee the order, 
the downstream will always process the ```BacklogAnnouncement``` before the 
buffer. The other reason is that we only announce credit when there is no 
credit available which means if there is credit available, only buffer with 
backlog will be sent, if there is no credit available, only 
```BacklogAnnouncement``` will be send, buffer will be always wait for credit. 
It is also not a problem if we announce the same credit twice because we can 
guarantee the process order.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to