zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328525320
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
 ##########
 @@ -18,110 +18,82 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully;
+
 
 /**
  * {@link OrderedStreamElementQueue} specific tests.
  */
 public class OrderedStreamElementQueueTest extends TestLogger {
-
-       private static final long timeout = 10000L;
-       private static ExecutorService executor;
-
-       @BeforeClass
-       public static void setup() {
-               executor = Executors.newFixedThreadPool(3);
-       }
-
-       @AfterClass
-       public static void shutdown() {
-               executor.shutdown();
-
-               try {
-                       if (!executor.awaitTermination(timeout, 
TimeUnit.MILLISECONDS)) {
-                               executor.shutdownNow();
-                       }
-               } catch (InterruptedException interrupted) {
-                       executor.shutdownNow();
-
-                       Thread.currentThread().interrupt();
-               }
-       }
-
        /**
         * Tests that only the head element is pulled from the ordered queue if 
it has been
         * completed.
         */
        @Test
-       public void testCompletionOrder() throws Exception {
-               OperatorActions operatorActions = mock(OperatorActions.class);
-               final OrderedStreamElementQueue queue = new 
OrderedStreamElementQueue(4, executor, operatorActions);
-
-               StreamRecordQueueEntry<Integer> entry1 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
-               StreamRecordQueueEntry<Integer> entry2 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
-               WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new 
Watermark(2L));
-               StreamRecordQueueEntry<Integer> entry4 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
-
-               List<StreamElementQueueEntry<?>> expected = 
Arrays.asList(entry1, entry2, entry3, entry4);
-
-               for (StreamElementQueueEntry<?> entry : expected) {
-                       queue.put(entry);
-               }
+       public void testCompletionOrder() {
+               final OrderedStreamElementQueue<Integer> queue = new 
OrderedStreamElementQueue<>(4);
 
-               CompletableFuture<List<AsyncResult>> pollOperation = 
CompletableFuture.supplyAsync(
-                       () -> {
-                               List<AsyncResult> result = new ArrayList<>(4);
-                               while (!queue.isEmpty()) {
-                                       try {
-                                               result.add(queue.poll());
-                                       } catch (InterruptedException e) {
-                                               throw new 
CompletionException(e);
-                                       }
-                               }
-
-                               return result;
-                       },
-                       executor);
-
-               Thread.sleep(10L);
-
-               Assert.assertFalse(pollOperation.isDone());
-
-               entry2.complete(Collections.<Integer>emptyList());
-
-               entry4.complete(Collections.<Integer>emptyList());
-
-               Thread.sleep(10L);
+               ResultFuture<Integer> entry1 = putSucessfully(queue, new 
StreamRecord<>(1, 0L));
+               ResultFuture<Integer> entry2 = putSucessfully(queue, new 
StreamRecord<>(2, 1L));
+               putSucessfully(queue, new Watermark(2L));
+               ResultFuture<Integer> entry4 = putSucessfully(queue, new 
StreamRecord<>(3, 3L));
 
+               Assert.assertEquals(Collections.emptyList(), 
popCompleted(queue));
                Assert.assertEquals(4, queue.size());
+               Assert.assertFalse(queue.isEmpty());
 
-               entry1.complete(Collections.<Integer>emptyList());
+               entry2.complete(Collections.singleton(11));
+               entry4.complete(Collections.singleton(13));
 
-               Assert.assertEquals(expected, pollOperation.get());
+               Assert.assertEquals(Collections.emptyList(), 
popCompleted(queue));
+               Assert.assertEquals(4, queue.size());
+               Assert.assertFalse(queue.isEmpty());
+
+               entry1.complete(Collections.singleton(10));
+
+               List<StreamElement> expected = Arrays.asList(
+                               new StreamRecord<>(10, 0L),
+                               new StreamRecord<>(11, 1L),
+                               new Watermark(2L),
+                               new StreamRecord<>(13, 3L));
+               Assert.assertEquals(expected, popCompleted(queue));
+               Assert.assertEquals(0, queue.size());
+               Assert.assertTrue(queue.isEmpty());
+       }
 
-               verify(operatorActions, 
never()).failOperator(any(Exception.class));
+       /**
+        * Tests two adjacent watermarks can be processed successfully.
+        */
+       @Test
+       public void testWatermarkOnly() {
+               final StreamElementQueue<Integer> queue = new 
OrderedStreamElementQueue<>(8);
+
+               putSucessfully(queue, new Watermark(2L));
+               putSucessfully(queue, new Watermark(5L));
+
+               Assert.assertEquals(2, queue.size());
+               Assert.assertFalse(queue.isEmpty());
+
+               Assert.assertEquals(Arrays.asList(
+                               new Watermark(2L),
 
 Review comment:
   ditto: indentation

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to