pnowojski commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r807945723



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -267,18 +267,31 @@ private void registerCallback() {
 
     @Override
     public void write(InputT element, Context context) throws IOException, 
InterruptedException {
+        while (mailboxExecutor.tryYield()) {}

Review comment:
       Sorry for a bit late response.
   
   > 1. There are two types of scenarios where we enqueue to the mailbox (1) to 
handle fatal exceptions and (2) to add to the buffer any failed request 
entries. I believe, these should take priority over flushing new items?
   
   Apart of that, (2) has also a very important purpose to mark decrease the 
number of "in flight" requests, so that `AsyncSinkWriter` can induce back 
pressure to upstream operators.
   
   > I do agree the line is dubious, perhaps, this is more appropriate: (the 
behaviour would remain identical)
   ```
   while (inFlightRequestsCount > 0) {
     mailboxExecutor.yield(); 
   }
   ```
   I think that would be better, BUT that would block writes and backpressure 
upstream operators even if there is just a single element still "in flight" and 
the internal buffers are mostly empty. I'm not sure if that's the intention?
   
   Can you elaborate what was wrong with the original condition
   ```
           while (bufferedRequestEntries.size() >= maxBufferedRequests) {
               mailboxExecutor.yield();
           }
   ```
   ?
   I mean I would normally expected the following behaviour. Given some max 
buffer capacity, having two thresholds and the following conditions:
   a) start flushing if `size >= threshold_1`
   b) block new writes/flushes once if `size == max_capacity`
   c) unblock if `size < threshold_2`
   
   Where for example `AsyncWaitOperator` implicitly defines `threshold_1 = 1` 
and `threshold_2 = max_capacity`. But having those thresholds as some fractions 
of max capacity could improve performance by limiting the amount of small write 
requests.
   
   With the `inFlightRequestsCount > 0`  condition to unblock, we are actually 
guaranteeing that the operator will be always wasting a bit of IO resources, as 
in this sequence of events:
   1. external system finishing the last IO write
   2. information about this reaching Flink, decreasing `inFlightRequestsCount` 
to `0`.
   3. Flink sending another write request to the external system
   4. external system starting it's own IO write.
   
   there will be a bit of delay between 1. and 4.
   
   > Perhaps I'm mistaken, but I don't believe we have a busy loop here. i.e. 
if mailboxExecutor.tryYield() returns true, there is some work to be elsewhere 
in the mailbox, then we perform that. Otherwise, it will return false and the 
loop will end. I can't see where CPU resources is being wasted?
   
   Yes, this particular snippet in this PR  doesn't have busy waiting. The 
version in the current master has actually this exact problem. However as far 
as I understand it, `AsyncSinkWriter#flush` is doing the busy waiting both on 
master and in this PR?
   
   On a side note, why do we need to have this logic spread among different 
methods? Can not we have one single check
   ```
   while (someCondition()) {
     mailboxExecutor.yield();
   }
   ```
   ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to