wsry commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r665933067
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
##########
@@ -164,6 +168,27 @@ void addCreditOrResumeConsumption(
}
}
+ /**
+ * Announces remaining backlog to the consumer after the available data
notification or data
+ * consumption resumption.
+ */
+ private void announceBacklog(NetworkSequenceViewReader reader) {
+ int backlog = reader.getRemainingBacklog();
+ if (backlog > 0) {
Review comment:
NetworkSequenceViewReader#isAvailable() has already been called by
PartitionRequestQueue#enqueueAvailableReader:
```
private void enqueueAvailableReader(final NetworkSequenceViewReader reader)
throws Exception {
if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
return;
}
......
}
```
I think we can change it to:
```
private void enqueueAvailableReader(final NetworkSequenceViewReader
reader) throws Exception {
if (reader.isRegisteredAsAvailable()) {
return;
}
ResultSubpartitionView.AvailabilityWithBacklog
availabilityWithBacklog =
reader.getAvailabilityAndBacklog();
if (!availabilityWithBacklog.isAvailable()) {
int backlog = availabilityWithBacklog.getBacklog();
if (backlog > 0 && reader.needAnnounceBacklog()) {
announceBacklog(reader, backlog);
}
return;
}
......
}
```
What do you think?
> After all even in your version I think there can be a race condition where
PartitionRequestQueue is notified reader is non empty, you check the
getRemainingBacklog() and send the BacklogAnnouncement message, while before it
gets processed by the receiver, some buffer is polled from this reader and
backlog goes down to 0. And as a result, receiver assigns as a credit to a
sender that doesn't have any data anymore?
I think there is no such problem. One reason is that
```getRemainingBacklog()``` is in netty thread and we can guarantee the order,
the downstream will always process the ```BacklogAnnouncement``` before the
buffer. The other reason is that we only announce credit when there is no
credit available which means if there is credit available, only buffer with
backlog will be sent, if there is no credit available, only
```BacklogAnnouncement``` will be send, buffer will be always wait for credit.
It is also not a problem if we announce the same credit twice because we can
guarantee the process order.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]