pnowojski commented on a change in pull request #18651:
URL: https://github.com/apache/flink/pull/18651#discussion_r803467988



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -267,18 +267,31 @@ private void registerCallback() {
 
     @Override
     public void write(InputT element, Context context) throws IOException, 
InterruptedException {
+        while (mailboxExecutor.tryYield()) {}

Review comment:
       Kind of @dmvk . One small caveat is that "yield" means "yield to 
downstream". So it only yields to mailbox actions enqueued by self OR 
downstream operators.
   
   I have however a couple of suspicious about this:
   1. We shouldn't be yielding on the hot path. Only if internal operator's 
buffer is full 
   2. I don't understand why it's `tryYield()` in the first place? IMO It 
should have been `yield()` the loop below from the get go:
   ```
           while (bufferedRequestEntries.size() >= maxBufferedRequests) {
               mailboxExecutor.yield();
           }
   ```
   Otherwise we have active/busy waiting loop wasting CPU resources.
   
   
   A good example how I believe similar pattern is correctly implemented is 
`AsyncWaitOperator`.
   
   I am kind of lost here and I would need to understand why it was changed to 
`tryYield` as part of https://issues.apache.org/jira/browse/FLINK-24234. I'm 
afraid that I won't be able to dig into this until next week.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to