AHeise commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r315107092
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 ##########
 @@ -387,35 +365,19 @@ private void stopResources(boolean waitForShutdown) 
throws InterruptedException
         * Add the given stream element queue entry to the operator's stream 
element queue. This
         * operation blocks until the element has been added.
         *
-        * <p>For that it tries to put the element into the queue and if not 
successful then it waits on
-        * the checkpointing lock. The checkpointing lock is also used by the 
{@link Emitter} to output
-        * elements. The emitter is also responsible for notifying this method 
if the queue has capacity
-        * left again, by calling notifyAll on the checkpointing lock.
-        *
         * @param streamElementQueueEntry to add to the operator's queue
         * @param <T> Type of the stream element queue entry's result
         * @throws InterruptedException if the current thread has been 
interrupted
         */
        private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> 
streamElementQueueEntry) throws InterruptedException {
-               assert(Thread.holdsLock(checkpointingLock));
-
-               pendingStreamElementQueueEntry = streamElementQueueEntry;
-
                while (!queue.tryPut(streamElementQueueEntry)) {
-                       // we wait for the emitter to notify us if the queue 
has space left again
-                       checkpointingLock.wait();
+                       yieldToDownstream();
                }
-
-               pendingStreamElementQueueEntry = null;
        }
 
-       private void waitInFlightInputsFinished() throws InterruptedException {
-               assert(Thread.holdsLock(checkpointingLock));
-
+       private void waitInFlightInputsFinished() {
                while (!queue.isEmpty()) {
-                       // wait for the emitter thread to output the remaining 
elements
-                       // for that he needs the checkpointing lock and thus we 
have to free it
-                       checkpointingLock.wait();
+                       yieldToDownstream();
 
 Review comment:
   You are right, now that emitter thread always enqueues results that would 
work fine.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to