This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e5a23caf340b05a4be076584da512129afb4e95f Author: Stephan Ewen <[email protected]> AuthorDate: Tue Sep 15 16:44:49 2020 +0200 [hotfix][tests] Extend test coverage for FutureCompletingBlockingQueue. --- .../FutureCompletingBlockingQueueTest.java | 91 +++++++++++++++++++--- 1 file changed, 82 insertions(+), 9 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java index 2a191d2..cdfef25 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.base.source.reader.synchronization; +import org.apache.flink.connector.base.source.reader.SourceReaderOptions; import org.apache.flink.runtime.io.AvailabilityProvider; import org.junit.Test; @@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -40,8 +42,8 @@ import static org.junit.Assert.fail; * The unit test for {@link FutureCompletingBlockingQueue}. */ public class FutureCompletingBlockingQueueTest { - private static final Integer DEFAULT_CAPACITY = 2; - private static final Integer SPECIFIED_CAPACITY = 20000; + + private static final int DEFAULT_CAPACITY = 2; @Test public void testBasics() throws InterruptedException { @@ -76,6 +78,16 @@ public class FutureCompletingBlockingQueueTest { } @Test + public void testPollEmptyQueue() throws InterruptedException { + FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(); + queue.put(0, 1234); + + assertNotNull(queue.poll()); + assertNull(queue.poll()); + assertNull(queue.poll()); + } + + @Test public void testWakeUpPut() throws InterruptedException { FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(1); @@ -144,13 +156,74 @@ public class FutureCompletingBlockingQueueTest { } @Test - public void testFutureCompletingBlockingQueueConstructor() { - FutureCompletingBlockingQueue<Object> defaultCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(); - FutureCompletingBlockingQueue<Object> specifiedCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(SPECIFIED_CAPACITY); - // The capacity of the queue needs to be equal to 10000 - assertEquals(defaultCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) DEFAULT_CAPACITY); - // The capacity of the queue needs to be equal to SPECIFIED_CAPACITY - assertEquals(specifiedCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) SPECIFIED_CAPACITY); + public void testSpecifiedQueueCapacity() { + final int capacity = 8_000; + final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>(capacity); + assertEquals(capacity, queue.remainingCapacity()); + } + + @Test + public void testQueueDefaultCapacity() { + final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>(); + assertEquals(DEFAULT_CAPACITY, queue.remainingCapacity()); + assertEquals(DEFAULT_CAPACITY, SourceReaderOptions.ELEMENT_QUEUE_CAPACITY.defaultValue().intValue()); + } + + @Test + public void testUnavailableWhenEmpty() { + final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>(); + assertFalse(queue.getAvailabilityFuture().isDone()); + } + + @Test + public void testImmediatelyAvailableAfterPut() throws InterruptedException { + final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>(); + queue.put(0, new Object()); + assertTrue(queue.getAvailabilityFuture().isDone()); + } + + @Test + public void testFutureBecomesAvailableAfterPut() throws InterruptedException { + final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>(); + final CompletableFuture<?> future = queue.getAvailabilityFuture(); + queue.put(0, new Object()); + assertTrue(future.isDone()); + } + + @Test + public void testUnavailableWhenBecomesEmpty() throws InterruptedException { + final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>(); + queue.put(0, new Object()); + queue.poll(); + assertFalse(queue.getAvailabilityFuture().isDone()); + } + + @Test + public void testAvailableAfterNotifyAvailable() throws InterruptedException { + final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>(); + queue.notifyAvailable(); + assertTrue(queue.getAvailabilityFuture().isDone()); + } + + @Test + public void testFutureBecomesAvailableAfterNotifyAvailable() throws InterruptedException { + final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>(); + final CompletableFuture<?> future = queue.getAvailabilityFuture(); + queue.notifyAvailable(); + assertTrue(future.isDone()); + } + + @Test + public void testPollResetsAvailability() throws InterruptedException { + final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>(); + queue.notifyAvailable(); + + final CompletableFuture<?> beforePoll = queue.getAvailabilityFuture(); + queue.poll(); + final CompletableFuture<?> afterPoll = queue.getAvailabilityFuture(); + + assertTrue(beforePoll.isDone()); + assertFalse(afterPoll.isDone()); } /**
