zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] 
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328541881
 
 

 ##########
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
 ##########
 @@ -18,174 +18,108 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted;
+import static 
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully;
 
 /**
  * {@link UnorderedStreamElementQueue} specific tests.
  */
 public class UnorderedStreamElementQueueTest extends TestLogger {
-       private static final long timeout = 10000L;
-       private static ExecutorService executor;
-
-       @BeforeClass
-       public static void setup() {
-               executor = Executors.newFixedThreadPool(3);
-       }
-
-       @AfterClass
-       public static void shutdown() {
-               executor.shutdown();
-
-               try {
-                       if (!executor.awaitTermination(timeout, 
TimeUnit.MILLISECONDS)) {
-                               executor.shutdownNow();
-                       }
-               } catch (InterruptedException interrupted) {
-                       executor.shutdownNow();
-
-                       Thread.currentThread().interrupt();
-               }
-       }
-
        /**
         * Tests that only elements before the oldest watermark are returned if 
they are completed.
         */
        @Test
-       public void testCompletionOrder() throws Exception {
-               OperatorActions operatorActions = mock(OperatorActions.class);
-
-               final UnorderedStreamElementQueue queue = new 
UnorderedStreamElementQueue(8, executor, operatorActions);
-
-               StreamRecordQueueEntry<Integer> record1 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
-               StreamRecordQueueEntry<Integer> record2 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
-               WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new 
Watermark(2L));
-               StreamRecordQueueEntry<Integer> record3 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
-               StreamRecordQueueEntry<Integer> record4 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(4, 4L));
-               WatermarkQueueEntry watermark2 = new WatermarkQueueEntry(new 
Watermark(5L));
-               StreamRecordQueueEntry<Integer> record5 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(5, 6L));
-               StreamRecordQueueEntry<Integer> record6 = new 
StreamRecordQueueEntry<>(new StreamRecord<>(6, 7L));
-
-               List<StreamElementQueueEntry<?>> entries = 
Arrays.asList(record1, record2, watermark1, record3,
-                       record4, watermark2, record5, record6);
-
-               // The queue should look like R1, R2, W1, R3, R4, W2, R5, R6
-               for (StreamElementQueueEntry<?> entry : entries) {
-                       queue.put(entry);
-               }
-
-               Assert.assertTrue(8 == queue.size());
-
-               CompletableFuture<AsyncResult> firstPoll = 
CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return queue.poll();
-                               } catch (InterruptedException e) {
-                                       throw new CompletionException(e);
-                               }
-                       },
-                       executor);
-
-               // this should not fulfill the poll, because R3 is behind W1
-               record3.complete(Collections.<Integer>emptyList());
+       public void testCompletionOrder() {
+               final UnorderedStreamElementQueue<Integer> queue = new 
UnorderedStreamElementQueue<>(8);
 
-               Thread.sleep(10L);
+               ResultFuture<Integer> record1 = putSucessfully(queue, new 
StreamRecord<>(1, 0L));
+               ResultFuture<Integer> record2 = putSucessfully(queue, new 
StreamRecord<>(2, 1L));
+               putSucessfully(queue, new Watermark(2L));
+               ResultFuture<Integer> record3 = putSucessfully(queue, new 
StreamRecord<>(3, 3L));
+               ResultFuture<Integer> record4 = putSucessfully(queue, new 
StreamRecord<>(4, 4L));
+               putSucessfully(queue, new Watermark(5L));
+               ResultFuture<Integer> record5 = putSucessfully(queue, new 
StreamRecord<>(5, 6L));
+               ResultFuture<Integer> record6 = putSucessfully(queue, new 
StreamRecord<>(6, 7L));
 
-               Assert.assertFalse(firstPoll.isDone());
+               Assert.assertEquals(Collections.emptyList(), 
popCompleted(queue));
+               Assert.assertEquals(8, queue.size());
+               Assert.assertFalse(queue.isEmpty());
 
-               record2.complete(Collections.<Integer>emptyList());
+               // this should not make any item completed, because R3 is 
behind W1
+               record3.complete(Arrays.asList(13));
 
-               Assert.assertEquals(record2, firstPoll.get());
+               Assert.assertEquals(Collections.emptyList(), 
popCompleted(queue));
+               Assert.assertEquals(8, queue.size());
+               Assert.assertFalse(queue.isEmpty());
 
-               CompletableFuture<AsyncResult> secondPoll = 
CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return queue.poll();
-                               } catch (InterruptedException e) {
-                                       throw new CompletionException(e);
-                               }
-                       },
-                       executor);
+               record2.complete(Arrays.asList(12));
 
-               record6.complete(Collections.<Integer>emptyList());
-               record4.complete(Collections.<Integer>emptyList());
+               Assert.assertEquals(Arrays.asList(new StreamRecord<>(12, 1L)), 
popCompleted(queue));
+               Assert.assertEquals(7, queue.size());
+               Assert.assertFalse(queue.isEmpty());
 
-               Thread.sleep(10L);
+               // Should not be completed because R1 has not been completed yet
+               record6.complete(Arrays.asList(16));
+               record4.complete(Arrays.asList(14));
 
-               // The future should not be completed because R1 has not been 
completed yet
-               Assert.assertFalse(secondPoll.isDone());
-
-               record1.complete(Collections.<Integer>emptyList());
-
-               Assert.assertEquals(record1, secondPoll.get());
+               Assert.assertEquals(Collections.emptyList(), 
popCompleted(queue));
+               Assert.assertEquals(7, queue.size());
+               Assert.assertFalse(queue.isEmpty());
 
                // Now W1, R3, R4 and W2 are completed and should be pollable
-               Assert.assertEquals(watermark1, queue.poll());
-
-               // The order of R3 and R4 is not specified
-               Set<AsyncResult> expected = new HashSet<>(2);
-               expected.add(record3);
-               expected.add(record4);
-
-               Set<AsyncResult> actual = new HashSet<>(2);
-
-               actual.add(queue.poll());
-               actual.add(queue.poll());
-
-               Assert.assertEquals(expected, actual);
-
-               Assert.assertEquals(watermark2, queue.poll());
-
-               // since R6 has been completed before and W2 has been consumed, 
we should be able to poll
-               // this record as well
-               Assert.assertEquals(record6, queue.poll());
+               record1.complete(Arrays.asList(11));
+
+               Assert.assertEquals(Arrays.asList(
+                               new StreamRecord<>(11, 0L),
+                               new Watermark(2L),
+                               new StreamRecord<>(13, 3L),
+                               new StreamRecord<>(14, 4L),
+                               new Watermark(5L),
+                               new StreamRecord<>(16, 7L)),
+                               popCompleted(queue));
+               Assert.assertEquals(1, queue.size());
+               Assert.assertFalse(queue.isEmpty());
 
                // only R5 left in the queue
-               Assert.assertTrue(1 == queue.size());
+               record5.complete(Arrays.asList(15));
 
-               CompletableFuture<AsyncResult> thirdPoll = 
CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return queue.poll();
-                               } catch (InterruptedException e) {
-                                       throw new CompletionException(e);
-                               }
-                       },
-                       executor);
+               Assert.assertEquals(Arrays.asList(new StreamRecord<>(15, 6L)), 
popCompleted(queue));
+               Assert.assertEquals(0, queue.size());
+               Assert.assertTrue(queue.isEmpty());
+               Assert.assertEquals(Arrays.asList(), popCompleted(queue));
 
 Review comment:
   `Collections.emptyList()` instead

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to