pnowojski commented on a change in pull request #9426: [FLINK-12958] Integrate
AsyncWaitOperator with mailbox (preserving compatibility with legacy sources)
URL: https://github.com/apache/flink/pull/9426#discussion_r314197853
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java
##########
@@ -138,19 +145,33 @@ private void output(AsyncResult asyncResult) throws
InterruptedException {
new Exception("An async
function call terminated with an exception. " +
"Failing the
AsyncWaitOperator.", e));
}
+ }
- // remove the peeked element from the async
collector buffer so that it is no longer
- // checkpointed
+ // remove the peeked element from the async collector
buffer so that it is no longer
+ // checkpointed
+ try {
streamElementQueue.poll();
-
- // notify the main thread that there is again
space left in the async collector
- // buffer
- checkpointLock.notifyAll();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
+ };
+
+ CompletableFuture<Void> writebackFuture =
CompletableFuture.runAsync(processingRequest, mailboxExecutor);
Review comment:
Logic of synchronizing on `checkpointLock` and notifying it should also be
hidden from the user.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services