zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r327973730
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java ########## @@ -218,88 +172,99 @@ public int size() { } /** - * Callback for onComplete events for the given stream element queue entry. Whenever a queue - * entry is completed, it is checked whether this entry belongs to the first set. If this is the - * case, then the element is added to the completed entries queue from where it can be consumed. - * If the first set becomes empty, then the next set is polled from the uncompleted entries - * queue. Completed entries from this new set are then added to the completed entries queue. - * - * @param streamElementQueueEntry which has been completed - * @throws InterruptedException if the current thread has been interrupted while performing the - * on complete callback. + * An entry that notifies the respective stage upon completion. */ - public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException { - lock.lockInterruptibly(); + static class StagedStreamRecordQueueEntry<OUT> extends StreamRecordQueueEntry<OUT> { + private final Stage stage; Review comment: `Stage<OUT>` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services