zhijiangW commented on a change in pull request #9717: [FLINK-14044] [runtime] Reducing synchronization in AsyncWaitOperator URL: https://github.com/apache/flink/pull/9717#discussion_r328525320
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java ########## @@ -18,110 +18,82 @@ package org.apache.flink.streaming.api.operators.async.queue; -import org.apache.flink.streaming.api.operators.async.OperatorActions; +import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; import org.junit.Test; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.popCompleted; +import static org.apache.flink.streaming.api.operators.async.queue.QueueUtil.putSucessfully; + /** * {@link OrderedStreamElementQueue} specific tests. */ public class OrderedStreamElementQueueTest extends TestLogger { - - private static final long timeout = 10000L; - private static ExecutorService executor; - - @BeforeClass - public static void setup() { - executor = Executors.newFixedThreadPool(3); - } - - @AfterClass - public static void shutdown() { - executor.shutdown(); - - try { - if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException interrupted) { - executor.shutdownNow(); - - Thread.currentThread().interrupt(); - } - } - /** * Tests that only the head element is pulled from the ordered queue if it has been * completed. */ @Test - public void testCompletionOrder() throws Exception { - OperatorActions operatorActions = mock(OperatorActions.class); - final OrderedStreamElementQueue queue = new OrderedStreamElementQueue(4, executor, operatorActions); - - StreamRecordQueueEntry<Integer> entry1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L)); - StreamRecordQueueEntry<Integer> entry2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L)); - WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new Watermark(2L)); - StreamRecordQueueEntry<Integer> entry4 = new StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L)); - - List<StreamElementQueueEntry<?>> expected = Arrays.asList(entry1, entry2, entry3, entry4); - - for (StreamElementQueueEntry<?> entry : expected) { - queue.put(entry); - } + public void testCompletionOrder() { + final OrderedStreamElementQueue<Integer> queue = new OrderedStreamElementQueue<>(4); - CompletableFuture<List<AsyncResult>> pollOperation = CompletableFuture.supplyAsync( - () -> { - List<AsyncResult> result = new ArrayList<>(4); - while (!queue.isEmpty()) { - try { - result.add(queue.poll()); - } catch (InterruptedException e) { - throw new CompletionException(e); - } - } - - return result; - }, - executor); - - Thread.sleep(10L); - - Assert.assertFalse(pollOperation.isDone()); - - entry2.complete(Collections.<Integer>emptyList()); - - entry4.complete(Collections.<Integer>emptyList()); - - Thread.sleep(10L); + ResultFuture<Integer> entry1 = putSucessfully(queue, new StreamRecord<>(1, 0L)); + ResultFuture<Integer> entry2 = putSucessfully(queue, new StreamRecord<>(2, 1L)); + putSucessfully(queue, new Watermark(2L)); + ResultFuture<Integer> entry4 = putSucessfully(queue, new StreamRecord<>(3, 3L)); + Assert.assertEquals(Collections.emptyList(), popCompleted(queue)); Assert.assertEquals(4, queue.size()); + Assert.assertFalse(queue.isEmpty()); - entry1.complete(Collections.<Integer>emptyList()); + entry2.complete(Collections.singleton(11)); + entry4.complete(Collections.singleton(13)); - Assert.assertEquals(expected, pollOperation.get()); + Assert.assertEquals(Collections.emptyList(), popCompleted(queue)); + Assert.assertEquals(4, queue.size()); + Assert.assertFalse(queue.isEmpty()); + + entry1.complete(Collections.singleton(10)); + + List<StreamElement> expected = Arrays.asList( + new StreamRecord<>(10, 0L), + new StreamRecord<>(11, 1L), + new Watermark(2L), + new StreamRecord<>(13, 3L)); + Assert.assertEquals(expected, popCompleted(queue)); + Assert.assertEquals(0, queue.size()); + Assert.assertTrue(queue.isEmpty()); + } - verify(operatorActions, never()).failOperator(any(Exception.class)); + /** + * Tests two adjacent watermarks can be processed successfully. + */ + @Test + public void testWatermarkOnly() { + final StreamElementQueue<Integer> queue = new OrderedStreamElementQueue<>(8); + + putSucessfully(queue, new Watermark(2L)); + putSucessfully(queue, new Watermark(5L)); + + Assert.assertEquals(2, queue.size()); + Assert.assertFalse(queue.isEmpty()); + + Assert.assertEquals(Arrays.asList( + new Watermark(2L), Review comment: ditto: indentation ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services