pnowojski commented on a change in pull request #9564:
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r322111247
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
##########
@@ -80,9 +88,14 @@ public void run() {
try {
while (running) {
LOG.debug("Wait for next completed async stream
element result.");
- AsyncResult streamElementEntry =
streamElementQueue.peekBlockingly();
-
- output(streamElementEntry);
+ AsyncResult asyncResult =
streamElementQueue.peekBlockingly();
+ executor.submit(() -> {
+ try {
+ output(asyncResult);
+ } catch (InterruptedException e) {
Review comment:
Why do we need this `InterruptedException`? I see that it's coming (and it
is the same in master branch) from `streamElementQueue.poll()`, however I don't
understand it:
1. We shouldn't be blocking neither in the mailbox thread (this version) nor
under the checkpoint lock (master version), so this already looks suspicious.
2. `poll` method that blocks is already "mildly" surprising contract on it's
own
3. From the code it looks like this `poll` call should never block and
changing it to a proper poll (returning `Optional` or `null`) and adding a
check state that the element is present, would work just as well but would be
better/cleaner.
@AHeise I remember that you were also rising some objections about the
`Emitter` code and that it could be simplified and you wanted to work on that
later. Did you have this in mind as well?
Am I missing something? If not can you @AHeise create a JIRA ticket for
cleaning up those things?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services