zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328060993
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueue.java
 ##########
 @@ -218,88 +172,99 @@ public int size() {
        }
 
        /**
-        * Callback for onComplete events for the given stream element queue 
entry. Whenever a queue
-        * entry is completed, it is checked whether this entry belongs to the 
first set. If this is the
-        * case, then the element is added to the completed entries queue from 
where it can be consumed.
-        * If the first set becomes empty, then the next set is polled from the 
uncompleted entries
-        * queue. Completed entries from this new set are then added to the 
completed entries queue.
-        *
-        * @param streamElementQueueEntry which has been completed
-        * @throws InterruptedException if the current thread has been 
interrupted while performing the
-        *      on complete callback.
+        * An entry that notifies the respective stage upon completion.
         */
-       public void onCompleteHandler(StreamElementQueueEntry<?> 
streamElementQueueEntry) throws InterruptedException {
-               lock.lockInterruptibly();
+       static class StagedStreamRecordQueueEntry<OUT> extends 
StreamRecordQueueEntry<OUT> {
+               private final Stage stage;
 
-               try {
-                       if (firstSet.remove(streamElementQueueEntry)) {
-                               completedQueue.offer(streamElementQueueEntry);
+               public StagedStreamRecordQueueEntry(StreamRecord<?> 
inputRecord, Stage stage) {
+                       super(inputRecord);
+                       this.stage = stage;
+               }
 
-                               while (firstSet.isEmpty() && firstSet != 
lastSet) {
-                                       firstSet = uncompletedQueue.poll();
+               @Override
+               public void complete(Collection<OUT> result) {
+                       super.complete(result);
+                       this.stage.completed(this);
+               }
+       }
 
-                                       Iterator<StreamElementQueueEntry<?>> it 
= firstSet.iterator();
+       /**
+        * A stage is a collection of queue entries that can be completed in 
arbitrary order.
+        */
+       static class Stage<OUT> {
+               /** Undrained finished elements. */
+               private final Queue<StreamElementQueueEntry<OUT>> 
completedQueue;
 
-                                       while (it.hasNext()) {
-                                               StreamElementQueueEntry<?> 
bufferEntry = it.next();
+               /** Finished and unfinished input elements. Used solely for 
checkpointing. */
+               private final Set<StreamElementQueueEntry<OUT>> pendingElements;
 
-                                               if (bufferEntry.isDone()) {
-                                                       
completedQueue.offer(bufferEntry);
-                                                       it.remove();
-                                               }
-                                       }
-                               }
+               Stage(int initialCapacity) {
+                       completedQueue = new ArrayDeque<>(initialCapacity);
+                       pendingElements = new HashSet<>(initialCapacity);
+               }
 
-                               LOG.debug("Signal unordered stream element 
queue has completed entries.");
-                               hasCompletedEntries.signalAll();
+               /**
+                * Signal that an entry finished computation.
+                */
+               void completed(StreamElementQueueEntry<OUT> elementQueueEntry) {
+                       // adding only to completed queue if not completed 
before
+                       // there may be a real result coming after the actual 
result, which is updated in the queue entry but
+                       // the entry is not readded to the complete queue
+                       if (pendingElements.remove(elementQueueEntry)) {
+                               completedQueue.add(elementQueueEntry);
                        }
-               } finally {
-                       lock.unlock();
                }
-       }
 
-       /**
-        * Add the given stream element queue entry to the current last set if 
it is not a watermark.
-        * If it is a watermark, then stop adding to the current last set, 
insert the watermark into its
-        * own set and add a new last set.
-        *
-        * @param streamElementQueueEntry to be inserted
-        * @param <T> Type of the stream element queue entry's result
-        */
-       private <T> void addEntry(StreamElementQueueEntry<T> 
streamElementQueueEntry) {
-               assert(lock.isHeldByCurrentThread());
+               /**
+                * True if there are no incomplete elements and all complete 
elements have been consumed.
+                */
+               boolean isEmpty() {
+                       return pendingElements.isEmpty() && 
completedQueue.isEmpty();
+               }
 
-               if (streamElementQueueEntry.isWatermark()) {
-                       lastSet = new HashSet<>(capacity);
+               boolean hasCompleted() {
+                       return !completedQueue.isEmpty();
+               }
 
-                       if (firstSet.isEmpty()) {
-                               firstSet.add(streamElementQueueEntry);
-                       } else {
-                               Set<StreamElementQueueEntry<?>> watermarkSet = 
new HashSet<>(1);
-                               watermarkSet.add(streamElementQueueEntry);
-                               uncompletedQueue.offer(watermarkSet);
+               /**
+                * Adds the stages input elements for checkpointing including 
completed but not yet emitted elements.
 
 Review comment:
   stages->staged?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to