This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e5a23caf340b05a4be076584da512129afb4e95f
Author: Stephan Ewen <[email protected]>
AuthorDate: Tue Sep 15 16:44:49 2020 +0200

    [hotfix][tests] Extend test coverage for FutureCompletingBlockingQueue.
---
 .../FutureCompletingBlockingQueueTest.java         | 91 +++++++++++++++++++---
 1 file changed, 82 insertions(+), 9 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
index 2a191d2..cdfef25 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader.synchronization;
 
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 
 import org.junit.Test;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -40,8 +42,8 @@ import static org.junit.Assert.fail;
  * The unit test for {@link FutureCompletingBlockingQueue}.
  */
 public class FutureCompletingBlockingQueueTest {
-       private static final Integer DEFAULT_CAPACITY = 2;
-       private static final Integer SPECIFIED_CAPACITY = 20000;
+
+       private static final int DEFAULT_CAPACITY = 2;
 
        @Test
        public void testBasics() throws InterruptedException {
@@ -76,6 +78,16 @@ public class FutureCompletingBlockingQueueTest {
        }
 
        @Test
+       public void testPollEmptyQueue() throws InterruptedException {
+               FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>();
+               queue.put(0, 1234);
+
+               assertNotNull(queue.poll());
+               assertNull(queue.poll());
+               assertNull(queue.poll());
+       }
+
+       @Test
        public void testWakeUpPut() throws InterruptedException {
                FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>(1);
 
@@ -144,13 +156,74 @@ public class FutureCompletingBlockingQueueTest {
        }
 
        @Test
-       public void testFutureCompletingBlockingQueueConstructor() {
-               FutureCompletingBlockingQueue<Object> 
defaultCapacityFutureCompletingBlockingQueue = new 
FutureCompletingBlockingQueue<>();
-               FutureCompletingBlockingQueue<Object> 
specifiedCapacityFutureCompletingBlockingQueue = new 
FutureCompletingBlockingQueue<>(SPECIFIED_CAPACITY);
-               // The capacity of the queue needs to be equal to 10000
-               
assertEquals(defaultCapacityFutureCompletingBlockingQueue.remainingCapacity(), 
(int) DEFAULT_CAPACITY);
-               // The capacity of the queue needs to be equal to 
SPECIFIED_CAPACITY
-               
assertEquals(specifiedCapacityFutureCompletingBlockingQueue.remainingCapacity(),
 (int) SPECIFIED_CAPACITY);
+       public void testSpecifiedQueueCapacity() {
+               final int capacity = 8_000;
+               final FutureCompletingBlockingQueue<Object> queue = new 
FutureCompletingBlockingQueue<>(capacity);
+               assertEquals(capacity, queue.remainingCapacity());
+       }
+
+       @Test
+       public void testQueueDefaultCapacity() {
+               final FutureCompletingBlockingQueue<Object> queue = new 
FutureCompletingBlockingQueue<>();
+               assertEquals(DEFAULT_CAPACITY, queue.remainingCapacity());
+               assertEquals(DEFAULT_CAPACITY, 
SourceReaderOptions.ELEMENT_QUEUE_CAPACITY.defaultValue().intValue());
+       }
+
+       @Test
+       public void testUnavailableWhenEmpty() {
+               final FutureCompletingBlockingQueue<Object> queue = new 
FutureCompletingBlockingQueue<>();
+               assertFalse(queue.getAvailabilityFuture().isDone());
+       }
+
+       @Test
+       public void testImmediatelyAvailableAfterPut() throws 
InterruptedException {
+               final FutureCompletingBlockingQueue<Object> queue = new 
FutureCompletingBlockingQueue<>();
+               queue.put(0, new Object());
+               assertTrue(queue.getAvailabilityFuture().isDone());
+       }
+
+       @Test
+       public void testFutureBecomesAvailableAfterPut() throws 
InterruptedException {
+               final FutureCompletingBlockingQueue<Object> queue = new 
FutureCompletingBlockingQueue<>();
+               final CompletableFuture<?> future = 
queue.getAvailabilityFuture();
+               queue.put(0, new Object());
+               assertTrue(future.isDone());
+       }
+
+       @Test
+       public void testUnavailableWhenBecomesEmpty() throws 
InterruptedException {
+               final FutureCompletingBlockingQueue<Object> queue = new 
FutureCompletingBlockingQueue<>();
+               queue.put(0, new Object());
+               queue.poll();
+               assertFalse(queue.getAvailabilityFuture().isDone());
+       }
+
+       @Test
+       public void testAvailableAfterNotifyAvailable() throws 
InterruptedException {
+               final FutureCompletingBlockingQueue<Object> queue = new 
FutureCompletingBlockingQueue<>();
+               queue.notifyAvailable();
+               assertTrue(queue.getAvailabilityFuture().isDone());
+       }
+
+       @Test
+       public void testFutureBecomesAvailableAfterNotifyAvailable() throws 
InterruptedException {
+               final FutureCompletingBlockingQueue<Object> queue = new 
FutureCompletingBlockingQueue<>();
+               final CompletableFuture<?> future = 
queue.getAvailabilityFuture();
+               queue.notifyAvailable();
+               assertTrue(future.isDone());
+       }
+
+       @Test
+       public void testPollResetsAvailability() throws InterruptedException {
+               final FutureCompletingBlockingQueue<Object> queue = new 
FutureCompletingBlockingQueue<>();
+               queue.notifyAvailable();
+
+               final CompletableFuture<?> beforePoll = 
queue.getAvailabilityFuture();
+               queue.poll();
+               final CompletableFuture<?> afterPoll = 
queue.getAvailabilityFuture();
+
+               assertTrue(beforePoll.isDone());
+               assertFalse(afterPoll.isDone());
        }
 
        /**

Reply via email to