zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime]
Reducing synchronization in AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9717#discussion_r328525293
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
##########
@@ -18,110 +18,82 @@
package org.apache.flink.streaming.api.operators.async.queue;
-import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted;
+import static
org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully;
+
/**
* {@link OrderedStreamElementQueue} specific tests.
*/
public class OrderedStreamElementQueueTest extends TestLogger {
-
- private static final long timeout = 10000L;
- private static ExecutorService executor;
-
- @BeforeClass
- public static void setup() {
- executor = Executors.newFixedThreadPool(3);
- }
-
- @AfterClass
- public static void shutdown() {
- executor.shutdown();
-
- try {
- if (!executor.awaitTermination(timeout,
TimeUnit.MILLISECONDS)) {
- executor.shutdownNow();
- }
- } catch (InterruptedException interrupted) {
- executor.shutdownNow();
-
- Thread.currentThread().interrupt();
- }
- }
-
/**
* Tests that only the head element is pulled from the ordered queue if
it has been
* completed.
*/
@Test
- public void testCompletionOrder() throws Exception {
- OperatorActions operatorActions = mock(OperatorActions.class);
- final OrderedStreamElementQueue queue = new
OrderedStreamElementQueue(4, executor, operatorActions);
-
- StreamRecordQueueEntry<Integer> entry1 = new
StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
- StreamRecordQueueEntry<Integer> entry2 = new
StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
- WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new
Watermark(2L));
- StreamRecordQueueEntry<Integer> entry4 = new
StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
-
- List<StreamElementQueueEntry<?>> expected =
Arrays.asList(entry1, entry2, entry3, entry4);
-
- for (StreamElementQueueEntry<?> entry : expected) {
- queue.put(entry);
- }
+ public void testCompletionOrder() {
+ final OrderedStreamElementQueue<Integer> queue = new
OrderedStreamElementQueue<>(4);
- CompletableFuture<List<AsyncResult>> pollOperation =
CompletableFuture.supplyAsync(
- () -> {
- List<AsyncResult> result = new ArrayList<>(4);
- while (!queue.isEmpty()) {
- try {
- result.add(queue.poll());
- } catch (InterruptedException e) {
- throw new
CompletionException(e);
- }
- }
-
- return result;
- },
- executor);
-
- Thread.sleep(10L);
-
- Assert.assertFalse(pollOperation.isDone());
-
- entry2.complete(Collections.<Integer>emptyList());
-
- entry4.complete(Collections.<Integer>emptyList());
-
- Thread.sleep(10L);
+ ResultFuture<Integer> entry1 = putSucessfully(queue, new
StreamRecord<>(1, 0L));
+ ResultFuture<Integer> entry2 = putSucessfully(queue, new
StreamRecord<>(2, 1L));
+ putSucessfully(queue, new Watermark(2L));
+ ResultFuture<Integer> entry4 = putSucessfully(queue, new
StreamRecord<>(3, 3L));
+ Assert.assertEquals(Collections.emptyList(),
popCompleted(queue));
Assert.assertEquals(4, queue.size());
+ Assert.assertFalse(queue.isEmpty());
- entry1.complete(Collections.<Integer>emptyList());
+ entry2.complete(Collections.singleton(11));
+ entry4.complete(Collections.singleton(13));
- Assert.assertEquals(expected, pollOperation.get());
+ Assert.assertEquals(Collections.emptyList(),
popCompleted(queue));
+ Assert.assertEquals(4, queue.size());
+ Assert.assertFalse(queue.isEmpty());
+
+ entry1.complete(Collections.singleton(10));
+
+ List<StreamElement> expected = Arrays.asList(
+ new StreamRecord<>(10, 0L),
+ new StreamRecord<>(11, 1L),
+ new Watermark(2L),
+ new StreamRecord<>(13, 3L));
+ Assert.assertEquals(expected, popCompleted(queue));
+ Assert.assertEquals(0, queue.size());
+ Assert.assertTrue(queue.isEmpty());
+ }
- verify(operatorActions,
never()).failOperator(any(Exception.class));
+ /**
+ * Tests two adjacent watermarks can be processed successfully.
+ */
+ @Test
+ public void testWatermarkOnly() {
+ final StreamElementQueue<Integer> queue = new
OrderedStreamElementQueue<>(8);
+
+ putSucessfully(queue, new Watermark(2L));
+ putSucessfully(queue, new Watermark(5L));
+
+ Assert.assertEquals(2, queue.size());
+ Assert.assertFalse(queue.isEmpty());
+
+ Assert.assertEquals(Arrays.asList(
+ new Watermark(2L),
+ new Watermark(5L)),
+ popCompleted(queue));
+ Assert.assertEquals(0, queue.size());
+ Assert.assertTrue(queue.isEmpty());
+ Assert.assertEquals(Arrays.asList(), popCompleted(queue));
Review comment:
Intellij inspection suggestions `Collections.emptyList()` instead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services