zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime]
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328544533
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
##########
@@ -18,174 +18,108 @@
package org.apache.flink.streaming.api.operators.async.queue;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+
+import static
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted;
+import static
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully;
/**
* {@link UnorderedStreamElementQueue} specific tests.
*/
public class UnorderedStreamElementQueueTest extends TestLogger {
- private static final long timeout = 10000L;
- private static ExecutorService executor;
-
- @BeforeClass
- public static void setup() {
- executor = Executors.newFixedThreadPool(3);
- }
-
- @AfterClass
- public static void shutdown() {
- executor.shutdown();
-
- try {
- if (!executor.awaitTermination(timeout,
TimeUnit.MILLISECONDS)) {
- executor.shutdownNow();
- }
- } catch (InterruptedException interrupted) {
- executor.shutdownNow();
-
- Thread.currentThread().interrupt();
- }
- }
-
/**
* Tests that only elements before the oldest watermark are returned if
they are completed.
*/
@Test
- public void testCompletionOrder() throws Exception {
- OperatorActions operatorActions = mock(OperatorActions.class);
-
- final UnorderedStreamElementQueue queue = new
UnorderedStreamElementQueue(8, executor, operatorActions);
-
- StreamRecordQueueEntry<Integer> record1 = new
StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
- StreamRecordQueueEntry<Integer> record2 = new
StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
- WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new
Watermark(2L));
- StreamRecordQueueEntry<Integer> record3 = new
StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
- StreamRecordQueueEntry<Integer> record4 = new
StreamRecordQueueEntry<>(new StreamRecord<>(4, 4L));
- WatermarkQueueEntry watermark2 = new WatermarkQueueEntry(new
Watermark(5L));
- StreamRecordQueueEntry<Integer> record5 = new
StreamRecordQueueEntry<>(new StreamRecord<>(5, 6L));
- StreamRecordQueueEntry<Integer> record6 = new
StreamRecordQueueEntry<>(new StreamRecord<>(6, 7L));
-
- List<StreamElementQueueEntry<?>> entries =
Arrays.asList(record1, record2, watermark1, record3,
- record4, watermark2, record5, record6);
-
- // The queue should look like R1, R2, W1, R3, R4, W2, R5, R6
- for (StreamElementQueueEntry<?> entry : entries) {
- queue.put(entry);
- }
-
- Assert.assertTrue(8 == queue.size());
-
- CompletableFuture<AsyncResult> firstPoll =
CompletableFuture.supplyAsync(
- () -> {
- try {
- return queue.poll();
- } catch (InterruptedException e) {
- throw new CompletionException(e);
- }
- },
- executor);
-
- // this should not fulfill the poll, because R3 is behind W1
- record3.complete(Collections.<Integer>emptyList());
+ public void testCompletionOrder() {
+ final UnorderedStreamElementQueue<Integer> queue = new
UnorderedStreamElementQueue<>(8);
- Thread.sleep(10L);
+ ResultFuture<Integer> record1 = putSucessfully(queue, new
StreamRecord<>(1, 0L));
+ ResultFuture<Integer> record2 = putSucessfully(queue, new
StreamRecord<>(2, 1L));
+ putSucessfully(queue, new Watermark(2L));
+ ResultFuture<Integer> record3 = putSucessfully(queue, new
StreamRecord<>(3, 3L));
+ ResultFuture<Integer> record4 = putSucessfully(queue, new
StreamRecord<>(4, 4L));
+ putSucessfully(queue, new Watermark(5L));
+ ResultFuture<Integer> record5 = putSucessfully(queue, new
StreamRecord<>(5, 6L));
+ ResultFuture<Integer> record6 = putSucessfully(queue, new
StreamRecord<>(6, 7L));
- Assert.assertFalse(firstPoll.isDone());
+ Assert.assertEquals(Collections.emptyList(),
popCompleted(queue));
+ Assert.assertEquals(8, queue.size());
+ Assert.assertFalse(queue.isEmpty());
- record2.complete(Collections.<Integer>emptyList());
+ // this should not make any item completed, because R3 is
behind W1
+ record3.complete(Arrays.asList(13));
- Assert.assertEquals(record2, firstPoll.get());
+ Assert.assertEquals(Collections.emptyList(),
popCompleted(queue));
+ Assert.assertEquals(8, queue.size());
+ Assert.assertFalse(queue.isEmpty());
- CompletableFuture<AsyncResult> secondPoll =
CompletableFuture.supplyAsync(
- () -> {
- try {
- return queue.poll();
- } catch (InterruptedException e) {
- throw new CompletionException(e);
- }
- },
- executor);
+ record2.complete(Arrays.asList(12));
- record6.complete(Collections.<Integer>emptyList());
- record4.complete(Collections.<Integer>emptyList());
+ Assert.assertEquals(Arrays.asList(new StreamRecord<>(12, 1L)),
popCompleted(queue));
+ Assert.assertEquals(7, queue.size());
+ Assert.assertFalse(queue.isEmpty());
- Thread.sleep(10L);
+ // Should not be completed because R1 has not been completed yet
+ record6.complete(Arrays.asList(16));
+ record4.complete(Arrays.asList(14));
- // The future should not be completed because R1 has not been
completed yet
- Assert.assertFalse(secondPoll.isDone());
-
- record1.complete(Collections.<Integer>emptyList());
-
- Assert.assertEquals(record1, secondPoll.get());
+ Assert.assertEquals(Collections.emptyList(),
popCompleted(queue));
+ Assert.assertEquals(7, queue.size());
+ Assert.assertFalse(queue.isEmpty());
// Now W1, R3, R4 and W2 are completed and should be pollable
- Assert.assertEquals(watermark1, queue.poll());
-
- // The order of R3 and R4 is not specified
- Set<AsyncResult> expected = new HashSet<>(2);
- expected.add(record3);
- expected.add(record4);
-
- Set<AsyncResult> actual = new HashSet<>(2);
-
- actual.add(queue.poll());
- actual.add(queue.poll());
-
- Assert.assertEquals(expected, actual);
-
- Assert.assertEquals(watermark2, queue.poll());
-
- // since R6 has been completed before and W2 has been consumed,
we should be able to poll
- // this record as well
- Assert.assertEquals(record6, queue.poll());
+ record1.complete(Arrays.asList(11));
+
+ Assert.assertEquals(Arrays.asList(
+ new StreamRecord<>(11, 0L),
+ new Watermark(2L),
+ new StreamRecord<>(13, 3L),
+ new StreamRecord<>(14, 4L),
+ new Watermark(5L),
+ new StreamRecord<>(16, 7L)),
+ popCompleted(queue));
+ Assert.assertEquals(1, queue.size());
+ Assert.assertFalse(queue.isEmpty());
// only R5 left in the queue
- Assert.assertTrue(1 == queue.size());
+ record5.complete(Arrays.asList(15));
- CompletableFuture<AsyncResult> thirdPoll =
CompletableFuture.supplyAsync(
- () -> {
- try {
- return queue.poll();
- } catch (InterruptedException e) {
- throw new CompletionException(e);
- }
- },
- executor);
+ Assert.assertEquals(Arrays.asList(new StreamRecord<>(15, 6L)),
popCompleted(queue));
+ Assert.assertEquals(0, queue.size());
+ Assert.assertTrue(queue.isEmpty());
+ Assert.assertEquals(Arrays.asList(), popCompleted(queue));
+ }
- Thread.sleep(10L);
- Assert.assertFalse(thirdPoll.isDone());
+ /**
+ * Tests two adjacent watermarks can be processed successfully.
+ */
+ @Test
+ public void testWatermarkOnly() {
Review comment:
This test is totally the same with
`OrderedStreamElementQueueTest#testWatermarkOnly`, so we could migrate it into
`StreamElementQueueTest` instead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services