pnowojski commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r807945723
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -267,18 +267,31 @@ private void registerCallback() {
@Override
public void write(InputT element, Context context) throws IOException,
InterruptedException {
+ while (mailboxExecutor.tryYield()) {}
Review comment:
Sorry for a bit late response.
> 1. There are two types of scenarios where we enqueue to the mailbox (1) to
handle fatal exceptions and (2) to add to the buffer any failed request
entries. I believe, these should take priority over flushing new items?
Apart of that, (2) has also a very important purpose to mark decrease the
number of "in flight" requests, so that `AsyncSinkWriter` can induce back
pressure to upstream operators.
> I do agree the line is dubious, perhaps, this is more appropriate: (the
behaviour would remain identical)
```
while (inFlightRequestsCount > 0) {
mailboxExecutor.yield();
}
```
I think that would be better, BUT that would block writes and backpressure
upstream operators even if there is just a single element still "in flight" and
the internal buffers are mostly empty. I'm not sure if that's the intention?
Can you elaborate what was wrong with the original condition
```
while (bufferedRequestEntries.size() >= maxBufferedRequests) {
mailboxExecutor.yield();
}
```
?
I mean I would normally expected the following behaviour. Given some max
buffer capacity, having two thresholds and the following conditions:
a) start flushing if `size >= threshold_1`
b) block new writes/flushes once if `size == max_capacity`
c) unblock if `size < threshold_2`
Where for example `AsyncWaitOperator` implicitly defines `threshold_1 = 1`
and `threshold_2 = max_capacity`. But having those thresholds as some fractions
of max capacity could improve performance by limiting the amount of small write
requests.
With the `inFlightRequestsCount > 0` condition to unblock, we are actually
guaranteeing that the operator will be always wasting a bit of IO resources, as
in this sequence of events:
1. external system finishing the last IO write
2. information about this reaching Flink, decreasing `inFlightRequestsCount`
to `0`.
3. Flink sending another write request to the external system
4. external system starting it's own IO write.
there will be a bit of delay between 1. and 4.
> Perhaps I'm mistaken, but I don't believe we have a busy loop here. i.e.
if mailboxExecutor.tryYield() returns true, there is some work to be elsewhere
in the mailbox, then we perform that. Otherwise, it will return false and the
loop will end. I can't see where CPU resources is being wasted?
Yes, this particular snippet in this PR doesn't have busy waiting. The
version in the current master has actually this exact problem. However as far
as I understand it, `AsyncSinkWriter#flush` is doing the busy waiting both on
master and in this PR?
On a side note, why do we need to have this logic spread among different
methods? Can not we have one single check
```
while (someCondition()) {
mailboxExecutor.yield();
}
```
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]