zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r327954620
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
 ##########
 @@ -19,117 +19,66 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.concurrent.Executor;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
 
 /**
- * Ordered {@link StreamElementQueue} implementation. The ordered stream 
element queue emits
+ * Ordered {@link StreamElementQueue} implementation. The ordered stream 
element queue provides
  * asynchronous results in the order in which the {@link 
StreamElementQueueEntry} have been added
  * to the queue. Thus, even if the completion order can be arbitrary, the 
output order strictly
  * follows the insertion order (element cannot overtake each other).
  */
 @Internal
-public class OrderedStreamElementQueue implements StreamElementQueue {
+public final class OrderedStreamElementQueue<OUT> implements 
StreamElementQueue<OUT> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(OrderedStreamElementQueue.class);
 
        /** Capacity of this queue. */
        private final int capacity;
 
-       /** Executor to run the onCompletion callback. */
-       private final Executor executor;
-
-       /** Operator actions to signal a failure to the operator. */
-       private final OperatorActions operatorActions;
-
-       /** Lock and conditions for the blocking queue. */
-       private final ReentrantLock lock;
-       private final Condition notFull;
-       private final Condition headIsCompleted;
-
        /** Queue for the inserted StreamElementQueueEntries. */
-       private final ArrayDeque<StreamElementQueueEntry<?>> queue;
-
-       public OrderedStreamElementQueue(
-                       int capacity,
-                       Executor executor,
-                       OperatorActions operatorActions) {
+       private final Queue<StreamElementQueueEntry<OUT>> queue;
 
+       public OrderedStreamElementQueue(int capacity) {
                Preconditions.checkArgument(capacity > 0, "The capacity must be 
larger than 0.");
-               this.capacity = capacity;
-
-               this.executor = Preconditions.checkNotNull(executor, 
"executor");
-
-               this.operatorActions = 
Preconditions.checkNotNull(operatorActions, "operatorActions");
-
-               this.lock = new ReentrantLock(false);
-               this.headIsCompleted = lock.newCondition();
-               this.notFull = lock.newCondition();
 
+               this.capacity = capacity;
                this.queue = new ArrayDeque<>(capacity);
        }
 
        @Override
-       public AsyncResult peekBlockingly() throws InterruptedException {
-               lock.lockInterruptibly();
-
-               try {
-                       while (queue.isEmpty() || !queue.peek().isDone()) {
-                               headIsCompleted.await();
-                       }
-
-                       LOG.debug("Peeked head element from ordered stream 
element queue with filling degree " +
-                               "({}/{}).", queue.size(), capacity);
-
-                       return queue.peek();
-               } finally {
-                       lock.unlock();
-               }
+       public boolean hasCompleted() {
+               return !queue.isEmpty() && queue.peek().isDone();
        }
 
        @Override
-       public AsyncResult poll() throws InterruptedException {
-               lock.lockInterruptibly();
-
-               try {
-                       while (queue.isEmpty() || !queue.peek().isDone()) {
-                               headIsCompleted.await();
-                       }
-
-                       notFull.signalAll();
-
-                       LOG.debug("Polled head element from ordered stream 
element queue. New filling degree " +
-                               "({}/{}).", queue.size() - 1, capacity);
-
-                       return queue.poll();
-               } finally {
-                       lock.unlock();
+       public void emitCompleted(TimestampedCollector<OUT> output) {
+               if (hasCompleted()) {
+                       final StreamElementQueueEntry<OUT> head = queue.poll();
+                       head.emitResult(output);
                }
        }
 
        @Override
-       public Collection<StreamElementQueueEntry<?>> values() throws 
InterruptedException {
-               lock.lockInterruptibly();
-
-               try {
-                       StreamElementQueueEntry<?>[] array = new 
StreamElementQueueEntry[queue.size()];
-
-                       array = queue.toArray(array);
-
-                       return Arrays.asList(array);
-               } finally {
-                       lock.unlock();
+       public List<StreamElement> values() {
+               List<StreamElement> list = new ArrayList<>();
+               for (StreamElementQueueEntry e : this.queue) {
 
 Review comment:
   I guess `this` keyword is only suggested in the constructor scope, not quite 
sure whether it has the specific rule.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to