This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 997fc5f0921abf89cac6fb0ccbf0da2aaf669228
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Mon Sep 14 23:53:21 2020 +0200

    [FLINK-17393][connectors] Wakeup the SplitFetchers more elegantly.
    
    This closes #13366
---
 .../base/source/reader/SourceReaderBase.java       |   3 +-
 .../base/source/reader/fetcher/FetchTask.java      |  30 +--
 .../base/source/reader/fetcher/SplitFetcher.java   |  26 +-
 .../source/reader/fetcher/SplitFetcherManager.java |   3 +-
 .../FutureCompletingBlockingQueue.java             | 268 +++++++++++++++++----
 .../source/reader/fetcher/SplitFetcherTest.java    |   8 +-
 .../source/reader/mocks/TestingSplitReader.java    |  13 +-
 .../FutureCompletingBlockingQueueTest.java         | 117 ++++++++-
 8 files changed, 378 insertions(+), 90 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 02b7a7c..979afb2 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -43,7 +43,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -66,7 +65,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
        private final FutureNotifier futureNotifier;
 
        /** A queue to buffer the elements fetched by the fetcher thread. */
-       private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+       private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue;
 
        /** The state of the splits. */
        private final Map<String, SplitContext<T, SplitStateT>> splitStates;
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index 30835ce..530add1 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -21,10 +21,10 @@ package 
org.apache.flink.connector.base.source.reader.fetcher;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
 import java.util.function.Consumer;
 
 /**
@@ -32,22 +32,22 @@ import java.util.function.Consumer;
  */
 class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
        private final SplitReader<E, SplitT> splitReader;
-       private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+       private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue;
        private final Consumer<Collection<String>> splitFinishedCallback;
-       private final Thread runningThread;
+       private final int fetcherIndex;
        private volatile RecordsWithSplitIds<E> lastRecords;
        private volatile boolean wakeup;
 
        FetchTask(
-               SplitReader<E, SplitT> splitReader,
-               BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
-               Consumer<Collection<String>> splitFinishedCallback,
-               Thread runningThread) {
+                       SplitReader<E, SplitT> splitReader,
+                       FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+                       Consumer<Collection<String>> splitFinishedCallback,
+                       int fetcherIndex) {
                this.splitReader = splitReader;
                this.elementsQueue = elementsQueue;
                this.splitFinishedCallback = splitFinishedCallback;
                this.lastRecords = null;
-               this.runningThread = runningThread;
+               this.fetcherIndex = fetcherIndex;
                this.wakeup = false;
        }
 
@@ -61,10 +61,11 @@ class FetchTask<E, SplitT extends SourceSplit> implements 
SplitFetcherTask {
                        if (!isWakenUp()) {
                                // The order matters here. We must first put 
the last records into the queue.
                                // This ensures the handling of the fetched 
records is atomic to wakeup.
-                               elementsQueue.put(lastRecords);
-                               // The callback does not throw 
InterruptedException.
-                               
splitFinishedCallback.accept(lastRecords.finishedSplits());
-                               lastRecords = null;
+                               if (elementsQueue.put(fetcherIndex, 
lastRecords)) {
+                                       // The callback does not throw 
InterruptedException.
+                                       
splitFinishedCallback.accept(lastRecords.finishedSplits());
+                                       lastRecords = null;
+                               }
                        }
                } finally {
                        // clean up the potential wakeup effect. It is possible 
that the fetcher is waken up
@@ -72,7 +73,6 @@ class FetchTask<E, SplitT extends SourceSplit> implements 
SplitFetcherTask {
                        // running thread will be interrupted. The next 
invocation of run() will see that and
                        // just skip.
                        if (isWakenUp()) {
-                               Thread.interrupted();
                                wakeup = false;
                        }
                }
@@ -93,12 +93,12 @@ class FetchTask<E, SplitT extends SourceSplit> implements 
SplitFetcherTask {
                        splitReader.wakeUp();
                } else {
                        // The task might be blocking on enqueuing the records, 
just interrupt.
-                       runningThread.interrupt();
+                       elementsQueue.wakeUpPuttingThread(fetcherIndex);
                }
        }
 
        private boolean isWakenUp() {
-               return wakeup || runningThread.isInterrupted();
+               return wakeup;
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 35deeba..fa1442e 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,7 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -50,21 +50,20 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
        private final Map<String, SplitT> assignedSplits;
        /** The current split assignments for this fetcher. */
        private final Queue<SplitsChange<SplitT>> splitChanges;
-       private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+       private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue;
        private final SplitReader<E, SplitT> splitReader;
        private final Runnable shutdownHook;
        private final AtomicBoolean wakeUp;
        private final AtomicBoolean closed;
        private FetchTask<E, SplitT> fetchTask;
-       private volatile Thread runningThread;
        private volatile SplitFetcherTask runningTask = null;
        private volatile boolean isIdle;
 
        SplitFetcher(
-               int id,
-               BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
-               SplitReader<E, SplitT> splitReader,
-               Runnable shutdownHook) {
+                       int id,
+                       FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+                       SplitReader<E, SplitT> splitReader,
+                       Runnable shutdownHook) {
 
                this.id = id;
                this.taskQueue = new LinkedBlockingDeque<>();
@@ -83,14 +82,13 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                LOG.info("Starting split fetcher {}", id);
                try {
                        // Remove the split from the assignments if it is 
already done.
-                       runningThread = Thread.currentThread();
                        this.fetchTask = new FetchTask<>(
-                               splitReader,
-                               elementsQueue,
-                               ids -> {
-                                       ids.forEach(this::removeAssignedSplit);
-                                       updateIsIdle();
-                               }, runningThread);
+                                       splitReader,
+                                       elementsQueue,
+                                       ids -> {
+                                               
ids.forEach(assignedSplits::remove);
+                                               updateIsIdle();
+                                       }, id);
                        while (!closed.get()) {
                                runOnce();
                        }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 61bada1..822a9a9 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -66,7 +65,7 @@ public abstract class SplitFetcherManager<E, SplitT extends 
SourceSplit> {
        private final AtomicReference<Throwable> uncaughtFetcherException;
 
        /** The element queue that the split fetchers will put elements into. */
-       private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+       private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue;
 
        /** A map keeping track of all the split fetchers. */
        protected final Map<Integer, SplitFetcher<E, SplitT>> fetchers;
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index de51af1..ea0f030 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -18,83 +18,257 @@
 
 package org.apache.flink.connector.base.source.reader.synchronization;
 
-import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Queue;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * A BlockingQueue that allows a consuming thread to be notified 
asynchronously on element
- * availability when the queue is empty.
- *
- * <p>Implementation wise, it is a subclass of {@link LinkedBlockingQueue} 
that ensures all
- * the methods adding elements into the queue will complete the elements 
availability future.
- *
- * <p>The overriding methods must first put the elements into the queue then 
check and complete
- * the future if needed. This is required to ensure the thread waiting for 
more messages will
- * not lose a notification.
+ * A custom implementation of blocking queue with the following features.
+ * <ul>
+ *     <li>
+ *         It allows a consuming thread to be notified asynchronously on 
element availability when the
+ *         queue is empty.
+ *     </li>
+ *     <li>
+ *         Allows the putting threads to be gracefully waken up without 
interruption.
+ *     </li>
+ * </ul>
  *
  * @param <T> the type of the elements in the queue.
  */
-public class FutureCompletingBlockingQueue<T> extends LinkedBlockingQueue<T> {
-
+public class FutureCompletingBlockingQueue<T> {
+       private final int capacity;
        private final FutureNotifier futureNotifier;
+
+       /** The element queue. */
+       private final Queue<T> queue;
+       /** The lock for synchronization. */
+       private final Lock lock;
+       /** The per-thread conditions that are waiting on putting elements. */
+       private final Queue<Condition> notFull;
+       /** The shared conditions for getting elements. */
+       private final Condition notEmpty;
+       /** The per-thread conditions and wakeUp flags. */
+       private ConditionAndFlag[] putConditionAndFlags;
+
        /**
-        * The default capacity for {@link LinkedBlockingQueue}.
+        * The default capacity for the queue.
         */
-       private static final Integer DEFAULT_CAPACITY = 10000;
+       private static final Integer DEFAULT_CAPACITY = 1;
 
        public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) {
                this(futureNotifier, DEFAULT_CAPACITY);
        }
 
        public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int 
capacity) {
-               super(capacity);
+               this.capacity = capacity;
                this.futureNotifier = futureNotifier;
+               this.queue = new ArrayDeque<>(capacity);
+               this.lock = new ReentrantLock();
+               this.putConditionAndFlags = new ConditionAndFlag[1];
+               this.notFull = new ArrayDeque<>();
+               this.notEmpty = lock.newCondition();
+       }
+
+       /**
+        * Put an element into the queue. The thread blocks if the queue is 
full.
+        *
+        * @param threadIndex the index of the thread.
+        * @param element the element to put.
+        * @return true if the element has been successfully put into the 
queue, false otherwise.
+        * @throws InterruptedException when the thread is interrupted.
+        */
+       public boolean put(int threadIndex, T element) throws 
InterruptedException {
+               if (element == null) {
+                       throw new NullPointerException();
+               }
+               lock.lockInterruptibly();
+               try {
+                       while (queue.size() >= capacity) {
+                               if (getAndResetWakeUpFlag(threadIndex)) {
+                                       return false;
+                               }
+                               waitOnPut(threadIndex);
+                       }
+                       enqueue(element);
+                       return true;
+               } finally {
+                       lock.unlock();
+               }
        }
 
-       @Override
-       public void put(T t) throws InterruptedException {
-               super.put(t);
+       /**
+        * Get and remove the first element from the queue. The call blocks if 
the queue is empty.
+        *
+        * @return the first element in the queue.
+        * @throws InterruptedException when the thread is interrupted.
+        */
+       public T take() throws InterruptedException{
+               lock.lock();
+               try {
+                       while (queue.size() == 0) {
+                               notEmpty.await();
+                       }
+                       return dequeue();
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       /**
+        * Get and remove the first element from the queue. Null is retuned if 
the queue is empty.
+        *
+        * @return the first element from the queue, or Null if the queue is 
empty.
+        */
+       public T poll() {
+               lock.lock();
+               try {
+                       if (queue.size() == 0) {
+                               return null;
+                       }
+                       return dequeue();
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       /**
+        * Get the first element from the queue without removing it.
+        *
+        * @return the first element in the queue, or Null if the queue is 
empty.
+        */
+       public T peek() {
+               lock.lock();
+               try {
+                       return queue.peek();
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       public int size() {
+               lock.lock();
+               try {
+                       return queue.size();
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       public boolean isEmpty() {
+               lock.lock();
+               try {
+                       return queue.isEmpty();
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       public int remainingCapacity() {
+               lock.lock();
+               try {
+                       return capacity - queue.size();
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       public void wakeUpPuttingThread(int threadIndex) {
+               lock.lock();
+               try {
+                       maybeCreateCondition(threadIndex);
+                       ConditionAndFlag caf = 
putConditionAndFlags[threadIndex];
+                       if (caf != null) {
+                               caf.setWakeUp(true);
+                               caf.condition().signal();
+                       }
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       // --------------- private helpers -------------------------
+
+       private void enqueue(T element) {
+               int sizeBefore = queue.size();
+               queue.add(element);
                futureNotifier.notifyComplete();
+               if (sizeBefore == 0) {
+                       notEmpty.signal();
+               }
+               if (sizeBefore < capacity - 1 && !notFull.isEmpty()) {
+                       signalNextPutter();
+               }
        }
 
-       @Override
-       public boolean offer(T t, long timeout, TimeUnit unit) throws 
InterruptedException {
-               if (super.offer(t, timeout, unit)) {
-                       futureNotifier.notifyComplete();
-                       return true;
-               } else {
-                       return false;
+       private T dequeue() {
+               int sizeBefore = queue.size();
+               T element = queue.poll();
+               if (sizeBefore == capacity && !notFull.isEmpty()) {
+                       signalNextPutter();
                }
+               if (sizeBefore > 1) {
+                       notEmpty.signal();
+               }
+               return element;
        }
 
-       @Override
-       public boolean offer(T t) {
-               if (super.offer(t)) {
-                       futureNotifier.notifyComplete();
-                       return true;
-               } else {
-                       return false;
+       private void waitOnPut(int fetcherIndex) throws InterruptedException {
+               maybeCreateCondition(fetcherIndex);
+               Condition cond = putConditionAndFlags[fetcherIndex].condition();
+               notFull.add(cond);
+               cond.await();
+       }
+
+       private void signalNextPutter() {
+               if (!notFull.isEmpty()) {
+                       notFull.poll().signal();
                }
        }
 
-       @Override
-       public boolean add(T t) {
-               if (super.add(t)) {
-                       futureNotifier.notifyComplete();
-                       return true;
-               } else {
-                       return false;
+       private void maybeCreateCondition(int threadIndex) {
+               if (putConditionAndFlags.length < threadIndex + 1) {
+                       putConditionAndFlags = 
Arrays.copyOf(putConditionAndFlags, threadIndex + 1);
+               }
+
+               if (putConditionAndFlags[threadIndex] == null) {
+                       putConditionAndFlags[threadIndex] = new 
ConditionAndFlag(lock.newCondition());
                }
        }
 
-       @Override
-       public boolean addAll(Collection<? extends T> c) {
-               if (super.addAll(c)) {
-                       futureNotifier.notifyComplete();
+       private boolean getAndResetWakeUpFlag(int threadIndex) {
+               maybeCreateCondition(threadIndex);
+               if (putConditionAndFlags[threadIndex].getWakeUp()) {
+                       putConditionAndFlags[threadIndex].setWakeUp(false);
                        return true;
-               } else {
-                       return false;
+               }
+               return false;
+       }
+
+       // --------------- private per thread state ------------
+
+       private static class ConditionAndFlag {
+               private final Condition cond;
+               private boolean wakeUp;
+
+               private ConditionAndFlag(Condition cond) {
+                       this.cond = cond;
+                       this.wakeUp = false;
+               }
+
+               private Condition condition() {
+                       return cond;
+               }
+
+               private boolean getWakeUp() {
+                       return wakeUp;
+               }
+
+               private void setWakeUp(boolean value) {
+                       wakeUp = value;
                }
        }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index e9c2ad2..eef8328 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.connector.base.source.reader.fetcher;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import org.junit.Test;
 
@@ -29,8 +31,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -45,9 +45,11 @@ public class SplitFetcherTest {
        private static final int NUM_RECORDS_PER_SPLIT = 10_000;
        private static final int INTERRUPT_RECORDS_INTERVAL = 10;
        private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT * 
NUM_SPLITS;
+
        @Test
        public void testWakeup() throws InterruptedException {
-               BlockingQueue<RecordsWithSplitIds<int[]>> elementQueue = new 
ArrayBlockingQueue<>(1);
+               FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementQueue =
+                       new FutureCompletingBlockingQueue<>(new 
FutureNotifier(), 1);
                SplitFetcher<int[], MockSourceSplit> fetcher =
                                new SplitFetcher<>(
                                                0,
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
index ede92eb..0d202f7 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -46,11 +46,10 @@ public class TestingSplitReader<E, SplitT extends 
SourceSplit> implements SplitR
                if (!fetches.isEmpty()) {
                        return fetches.removeFirst();
                } else {
-                       // block until interrupted
+                       // block until woken up
                        synchronized (fetches) {
-                               while (true) {
-                                       fetches.wait();
-                               }
+                               fetches.wait();
+                               return null;
                        }
                }
        }
@@ -61,5 +60,9 @@ public class TestingSplitReader<E, SplitT extends 
SourceSplit> implements SplitR
        }
 
        @Override
-       public void wakeUp() {}
+       public void wakeUp() {
+               synchronized (fetches) {
+                       fetches.notifyAll();
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
index ad74f2a..c1bde50 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -20,16 +20,129 @@ package 
org.apache.flink.connector.base.source.reader.synchronization;
 
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * The unit test for {@link FutureCompletingBlockingQueue}.
  */
 public class FutureCompletingBlockingQueueTest {
+       private static final Integer DEFAULT_CAPACITY = 1;
+       private static final Integer SPECIFIED_CAPACITY = 20000;
+
+       @Test
+       public void testBasics() throws InterruptedException {
+               FutureNotifier futureNotifier = new FutureNotifier();
+               FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>(futureNotifier, 5);
 
+               CompletableFuture<Void> future = futureNotifier.future();
+               assertTrue(queue.isEmpty());
+               assertEquals(0, queue.size());
 
-       private static final Integer DEFAULT_CAPACITY = 10000;
-       private static final Integer SPECIFIED_CAPACITY = 20000;
+               queue.put(0, 1234);
+
+               assertTrue(future.isDone());
+               assertEquals(1, queue.size());
+               assertFalse(queue.isEmpty());
+               assertEquals(4, queue.remainingCapacity());
+               assertNotNull(queue.peek());
+               assertEquals(1234, (int) queue.peek());
+               assertEquals(1234, (int) queue.poll());
+
+               assertEquals(0, queue.size());
+               assertTrue(queue.isEmpty());
+               assertEquals(5, queue.remainingCapacity());
+       }
+
+       @Test
+       public void testPoll() throws InterruptedException {
+               FutureNotifier futureNotifier = new FutureNotifier();
+               FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>(futureNotifier);
+               queue.put(0, 1234);
+               Integer value = queue.poll();
+               assertNotNull(value);
+               assertEquals(1234, (int) value);
+       }
+
+       @Test
+       public void testWakeUpPut() throws InterruptedException {
+               FutureNotifier futureNotifier = new FutureNotifier();
+               FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>(futureNotifier, 1);
+
+               CountDownLatch latch = new CountDownLatch(1);
+               new Thread(() -> {
+                       try {
+                               assertTrue(queue.put(0, 1234));
+                               assertFalse(queue.put(0, 1234));
+                               latch.countDown();
+                       } catch (InterruptedException e) {
+                               fail("Interrupted unexpectedly.");
+                       }
+               }).start();
+
+               queue.wakeUpPuttingThread(0);
+               latch.await();
+               assertEquals(0, latch.getCount());
+       }
+
+       @Test
+       public void testConcurrency() throws InterruptedException {
+               FutureNotifier futureNotifier = new FutureNotifier();
+               FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>(futureNotifier, 5);
+               final int numValuesPerThread = 10000;
+               final int numPuttingThreads = 5;
+               List<Thread> threads = new ArrayList<>();
+
+               for (int i = 0; i < numPuttingThreads; i++) {
+                       final int index = i;
+                       Thread t = new Thread(() -> {
+                               for (int j = 0; j < numValuesPerThread; j++) {
+                                       int base = index * numValuesPerThread;
+                                       try {
+                                               queue.put(index, base + j);
+                                       } catch (InterruptedException e) {
+                                               fail("putting thread 
interrupted.");
+                                       }
+                               }
+                       });
+                       t.start();
+                       threads.add(t);
+               }
+
+               BitSet bitSet = new BitSet();
+               AtomicInteger count = new AtomicInteger(0);
+               for (int i = 0; i < 5; i++) {
+                       Thread t = new Thread(() -> {
+                               while (count.get() < numPuttingThreads * 
numValuesPerThread) {
+                                       Integer value = queue.poll();
+                                       if (value == null) {
+                                               continue;
+                                       }
+                                       count.incrementAndGet();
+                                       if (bitSet.get(value)) {
+                                               fail("Value " + value + " has 
been consumed before");
+                                       }
+                                       synchronized (bitSet) {
+                                               bitSet.set(value);
+                                       }
+                               }});
+                       t.start();
+                       threads.add(t);
+               }
+               for (Thread t : threads) {
+                       t.join();
+               }
+       }
 
        @Test
        public void testFutureCompletingBlockingQueueConstructor() {

Reply via email to