pnowojski commented on a change in pull request #8853: [FLINK-12958] Integrate
AsyncWaitOperator with mailbox
URL: https://github.com/apache/flink/pull/8853#discussion_r296743811
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
##########
@@ -272,12 +277,14 @@ public void initializeState(StateInitializationContext
context) throws Exception
@Override
public void close() throws Exception {
try {
- assert(Thread.holdsLock(checkpointingLock));
-
while (!queue.isEmpty()) {
- // wait for the emitter thread to output the
remaining elements
- // for that he needs the checkpointing lock and
thus we have to free it
- checkpointingLock.wait();
+ // TODO: once we no longer support legacy
sources, the following `if` can simply become
+ // `mainThreadExecutor.yield()'
+ if (!mainThreadExecutor.tryYield()) {
+ // We give up the lock while there is
no action for yielding, so that potentially a legacy source
+ // loop (using the lock in/around
letter execution) can make progress
+ checkpointingLock.wait(10L);
Review comment:
Why do we need this timed wait?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services