AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315546676
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##########
 @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
         * Add the given stream element queue entry to the operator's stream 
element queue. This
         * operation blocks until the element has been added.
         *
-        * <p>For that it tries to put the element into the queue and if not 
successful then it waits on
-        * the checkpointing lock. The checkpointing lock is also used by the 
{@link Emitter} to output
-        * elements. The emitter is also responsible for notifying this method 
if the queue has capacity
-        * left again, by calling notifyAll on the checkpointing lock.
-        *
         * @param streamElementQueueEntry to add to the operator's queue
         * @param <T> Type of the stream element queue entry's result
         * @throws InterruptedException if the current thread has been 
interrupted
         */
        private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> 
streamElementQueueEntry) throws InterruptedException {
-               assert(Thread.holdsLock(checkpointingLock));
-
-               pendingStreamElementQueueEntry = streamElementQueueEntry;
-
                while (!queue.tryPut(streamElementQueueEntry)) {
-                       // we wait for the emitter to notify us if the queue 
has space left again
-                       checkpointingLock.wait();
+                       yieldToDownstream();
                }
-
-               pendingStreamElementQueueEntry = null;
        }
 
-       private void waitInFlightInputsFinished() throws InterruptedException {
-               assert(Thread.holdsLock(checkpointingLock));
-
+       private void waitInFlightInputsFinished() {
                while (!queue.isEmpty()) {
-                       // wait for the emitter thread to output the remaining 
elements
-                       // for that he needs the checkpointing lock and thus we 
have to free it
-                       checkpointingLock.wait();
+                       yieldToDownstream();
 
 Review comment:
   If we do yield now before migrating processing timers, we can run into 
deadlocks.
   
   Imagine the following: asyncIO is using an external system that is currently 
not available. The implementation relies on timeouts to occur. Thus, the queue 
of the AsyncWaitOperator is full with tasks that require the timeout to happen. 
At that point, we need to release the checkpoint lock instead of yielding 
(since processor timers are not migrated yet). Thus, we need to alternate 
between yieldToDownstream and releasing checkpoint lock.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to