pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r322111247
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
 ##########
 @@ -80,9 +88,14 @@ public void run() {
                try {
                        while (running) {
                                LOG.debug("Wait for next completed async stream 
element result.");
-                               AsyncResult streamElementEntry = 
streamElementQueue.peekBlockingly();
-
-                               output(streamElementEntry);
+                               AsyncResult asyncResult = 
streamElementQueue.peekBlockingly();
+                               executor.submit(() -> {
+                                       try {
+                                               output(asyncResult);
+                                       } catch (InterruptedException e) {
 
 Review comment:
   Why do we need this `InterruptedException`? I see that it's coming (and it 
is the same in master branch) from `streamElementQueue.poll()`, however I don't 
understand it:
   1. We shouldn't be blocking neither in the mailbox thread (this version) nor 
under the checkpoint lock (master version), so this already looks suspicious.
   2. `poll` method that blocks is already "mildly" surprising contract on it's 
own
   3. From the code it looks like this `poll` call should never block and 
changing it to a proper poll (returning `Optional` or `null`) and adding a 
check state that the element is present, would work just as well but would be 
better/cleaner.
   
   @AHeise I remember that you were also rising some objections about the 
`Emitter` code and that it could be simplified and you wanted to work on that 
later. Did you have this in mind as well?
   
   Am I missing something? If not can you @AHeise create a JIRA ticket for 
cleaning up those things?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to