This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e821eaa483b0371ac1df1339f0d3c9ad7376976
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Sep 14 23:48:01 2020 +0200

    [FLINK-19223][connectors] Simplify Availability Future Model in Base 
Connector
    
    This implements a model closer to the AvailabilityListener and 
AvailabilityHelper in the flink-runtime.
    
    This closes #13385
---
 .../SingleThreadMultiplexSourceReaderBase.java     |   5 +-
 .../base/source/reader/SourceReaderBase.java       |  21 +-
 .../reader/fetcher/SingleThreadFetcherManager.java |   4 +-
 .../source/reader/fetcher/SplitFetcherManager.java |   5 +-
 .../FutureCompletingBlockingQueue.java             | 259 +++++++++++++++++----
 .../reader/synchronization/FutureNotifier.java     |  66 ------
 .../base/source/reader/SourceReaderBaseTest.java   |  13 +-
 .../source/reader/fetcher/SplitFetcherTest.java    |  36 ++-
 .../base/source/reader/mocks/MockBaseSource.java   |   5 +-
 .../base/source/reader/mocks/MockSourceReader.java |   6 +-
 .../FutureCompletingBlockingQueueTest.java         |  32 ++-
 .../reader/synchronization/FutureNotifierTest.java | 131 -----------
 12 files changed, 261 insertions(+), 322 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index 3239f28..ab87db0 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import java.util.function.Supplier;
 
@@ -40,16 +39,14 @@ public abstract class 
SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends
        extends SourceReaderBase<E, T, SplitT, SplitStateT> {
 
        public SingleThreadMultiplexSourceReaderBase(
-               FutureNotifier futureNotifier,
                FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
                Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
                RecordEmitter<E, T, SplitStateT> recordEmitter,
                Configuration config,
                SourceReaderContext context) {
                super(
-                       futureNotifier,
                        elementsQueue,
-                       new SingleThreadFetcherManager<>(futureNotifier, 
elementsQueue, splitReaderSupplier),
+                       new SingleThreadFetcherManager<>(elementsQueue, 
splitReaderSupplier),
                        recordEmitter,
                        config,
                        context);
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 0305e2d..fb4e6df9 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 import org.apache.flink.core.io.InputStatus;
 
 import org.slf4j.Logger;
@@ -61,9 +60,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
                implements SourceReader<T, SplitT> {
        private static final Logger LOG = 
LoggerFactory.getLogger(SourceReaderBase.class);
 
-       /** A future notifier to notify when this reader requires attention. */
-       private final FutureNotifier futureNotifier;
-
        /** A queue to buffer the elements fetched by the fetcher thread. */
        private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue;
 
@@ -94,13 +90,11 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
        private boolean noMoreSplitsAssignment;
 
        public SourceReaderBase(
-                       FutureNotifier futureNotifier,
                        FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
                        SplitFetcherManager<E, SplitT> splitFetcherManager,
                        RecordEmitter<E, T, SplitStateT> recordEmitter,
                        Configuration config,
                        SourceReaderContext context) {
-               this.futureNotifier = futureNotifier;
                this.elementsQueue = elementsQueue;
                this.splitFetcherManager = splitFetcherManager;
                this.recordEmitter = recordEmitter;
@@ -203,18 +197,7 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
 
        @Override
        public CompletableFuture<Void> isAvailable() {
-               // The order matters here. We first get the future. After this 
point, if the queue
-               // is empty or there is no error in the split fetcher manager, 
we can ensure that
-               // the future will be completed by the fetcher once it put an 
element into the element queue,
-               // or it will be completed when an error occurs.
-               CompletableFuture<Void> future = futureNotifier.future();
-               splitFetcherManager.checkErrors();
-               if (!elementsQueue.isEmpty()) {
-                       // The fetcher got the new elements after the last 
poll, or their is a finished split.
-                       // Simply complete the future and return;
-                       futureNotifier.notifyComplete();
-               }
-               return future;
+               return currentFetch != null ? 
FutureCompletingBlockingQueue.AVAILABLE : elementsQueue.getAvailabilityFuture();
        }
 
        @Override
@@ -239,7 +222,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
                if (sourceEvent instanceof NoMoreSplitsEvent) {
                        LOG.info("Reader received NoMoreSplits event.");
                        noMoreSplitsAssignment = true;
-                       futureNotifier.notifyComplete();
+                       elementsQueue.notifyAvailable();
                }
        }
 
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
index bd5879f..339c533 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import java.util.List;
 import java.util.function.Supplier;
@@ -34,10 +33,9 @@ public class SingleThreadFetcherManager<E, SplitT extends 
SourceSplit>
                extends SplitFetcherManager<E, SplitT> {
 
        public SingleThreadFetcherManager(
-                       FutureNotifier futureNotifier,
                        FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
                        Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
-               super(futureNotifier, elementsQueue, splitReaderSupplier);
+               super(elementsQueue, splitReaderSupplier);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 26d92e3..ffac523 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderBase;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 import org.apache.flink.util.ThrowableCatchingRunnable;
 
 import org.slf4j.Logger;
@@ -79,12 +78,10 @@ public abstract class SplitFetcherManager<E, SplitT extends 
SourceSplit> {
        /**
         * Create a split fetcher manager.
         *
-        * @param futureNotifier a notifier to notify the complete of a future.
         * @param elementsQueue the queue that split readers will put elements 
into.
         * @param splitReaderFactory a supplier that could be used to create 
split readers.
         */
        public SplitFetcherManager(
-                       FutureNotifier futureNotifier,
                        FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
                        Supplier<SplitReader<E, SplitT>> splitReaderFactory) {
                this.elementsQueue = elementsQueue;
@@ -96,7 +93,7 @@ public abstract class SplitFetcherManager<E, SplitT extends 
SourceSplit> {
                                        // Add the exception to the exception 
list.
                                        
uncaughtFetcherException.get().addSuppressed(t);
                                        // Wake up the main thread to let it 
know the exception.
-                                       futureNotifier.notifyComplete();
+                                       elementsQueue.notifyAvailable();
                                }
                        }
                };
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index dcbb66e..c89b682 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -18,62 +18,174 @@
 
 package org.apache.flink.connector.base.source.reader.synchronization;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.lang.reflect.Field;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
- * A custom implementation of blocking queue with the following features.
- * <ul>
- *     <li>
- *         It allows a consuming thread to be notified asynchronously on 
element availability when the
- *         queue is empty.
- *     </li>
- *     <li>
- *         Allows the putting threads to be gracefully waken up without 
interruption.
- *     </li>
- * </ul>
+ * A custom implementation of blocking queue in combination with a {@link 
CompletableFuture} that is
+ * used in the hand-over of data from a producing thread to a consuming thread.
+ * This FutureCompletingBlockingQueue has the following features:
+ *
+ * <h3>Consumer Notifications</h3>
+ *
+ * <p>Rather than letting consumers block on the {@link #take()} method, or 
have them poll the
+ * {@link #poll()} method, this queue offers a {@link CompletableFuture}, 
obtained via the
+ * {@link #getAvailabilityFuture()} method) that gets completed whenever the 
queue is non-empty.
+ * A consumer can thus subscribe to asynchronous notifications for 
availability by adding a handler
+ * to the obtained {@code CompletableFuture}.
+ *
+ * <p>The future may also be completed by an explicit call to {@link 
#notifyAvailable()}. That way the
+ * consumer may be notified of a situation/condition without adding an element 
to the queue.
+ *
+ * <p>Availability is reset when a call to {@link #poll()} (or {@link #take()} 
finds an empty queue
+ * or results in an empty queue (takes the last element).
+ *
+ * <p>Note that this model generally assumes that <i>false positives</i> are 
okay, meaning that the
+ * availability future completes despite there being no data availabile in the 
queue. The consumer is
+ * responsible for polling data and obtaining another future to wait on. This 
is similar to the way
+ * that Java's Monitors and Conditions can have the <i>spurious wakeup</i> of 
the waiting threads
+ * and commonly need to be used in loop with the waiting condition.
+ *
+ * <h3>Producer Wakeup</h3>
+ *
+ * <p>The queue supports gracefully waking up producing threads that are 
blocked due to the queue
+ * capacity limits, without interrupting the thread. This is done via the 
{@link #wakeUpPuttingThread(int)}
+ * method.
  *
  * @param <T> the type of the elements in the queue.
  */
 public class FutureCompletingBlockingQueue<T> {
+
+       /**
+        * A constant future that is complete, indicating availability. Using 
this constant in cases that
+        * are guaranteed available helps short-circuiting some checks and 
avoiding volatile memory operations.
+        */
+       public static final CompletableFuture<Void> AVAILABLE = 
getAvailableFuture();
+
+       /**
+        * The default capacity for the queue.
+        */
+       private static final int DEFAULT_CAPACITY = 1;
+
+       // 
------------------------------------------------------------------------
+
+       /** The maximum capacity of the queue. */
        private final int capacity;
-       private final FutureNotifier futureNotifier;
 
-       /** The element queue. */
-       private final Queue<T> queue;
+       /** The availability future. This doubles as a "non empty" condition. 
This value is never null.*/
+       private CompletableFuture<Void> currentFuture;
+
        /** The lock for synchronization. */
        private final Lock lock;
+
+       /** The element queue. */
+       @GuardedBy("lock")
+       private final Queue<T> queue;
+
        /** The per-thread conditions that are waiting on putting elements. */
+       @GuardedBy("lock")
        private final Queue<Condition> notFull;
-       /** The shared conditions for getting elements. */
-       private final Condition notEmpty;
+
        /** The per-thread conditions and wakeUp flags. */
+       @GuardedBy("lock")
        private ConditionAndFlag[] putConditionAndFlags;
 
-       /**
-        * The default capacity for the queue.
-        */
-       private static final Integer DEFAULT_CAPACITY = 1;
-
-       public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) {
-               this(futureNotifier, DEFAULT_CAPACITY);
+       public FutureCompletingBlockingQueue() {
+               this(DEFAULT_CAPACITY);
        }
 
-       public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int 
capacity) {
+       public FutureCompletingBlockingQueue(int capacity) {
+               checkArgument(capacity > 0, "capacity must be > 0");
                this.capacity = capacity;
-               this.futureNotifier = futureNotifier;
                this.queue = new ArrayDeque<>(capacity);
                this.lock = new ReentrantLock();
                this.putConditionAndFlags = new ConditionAndFlag[1];
                this.notFull = new ArrayDeque<>();
-               this.notEmpty = lock.newCondition();
+
+               // initially the queue is empty and thus unavailable
+               this.currentFuture = new CompletableFuture<>();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Future / Notification logic
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Returns the availability future. If the queue is non-empty, then 
this future will already
+        * be complete. Otherwise the obtained future is guaranteed to get 
completed the next time
+        * the queue becomes non-empty, or a notification happens via {@link 
#notifyAvailable()}.
+        *
+        * <p>It is important that a completed future is no guarantee that the 
next call to
+        * {@link #poll()} will return a non-null element. If there are 
concurrent consumer, another
+        * consumer may have taken the available element. Or there was no 
element in the first place,
+        * because the future was completed through a call to {@link 
#notifyAvailable()}.
+        *
+        * <p>For that reason, it is important to call this method (to obtain a 
new future) every
+        * time again after {@link #poll()} returned null and you want to wait 
for data.
+        */
+       public CompletableFuture<Void> getAvailabilityFuture() {
+               return currentFuture;
+       }
+
+       /**
+        * Makes sure the availability future is complete, if it is not 
complete already.
+        * All futures returned by previous calls to {@link 
#getAvailabilityFuture()} are guaranteed to
+        * be completed.
+        *
+        * <p>All future calls to the method will return a completed future, 
until the point
+        * that the availability is reset via calls to {@link #poll()} that 
leave the queue empty.
+        */
+       public void notifyAvailable() {
+               lock.lock();
+               try {
+                       moveToAvailable();
+               } finally {
+                       lock.unlock();
+               }
+       }
+
+       /**
+        * Internal utility to make sure that the current future futures are 
complete (until reset).
+        */
+       @GuardedBy("lock")
+       private void moveToAvailable() {
+               final CompletableFuture<Void> current = currentFuture;
+               if (current != AVAILABLE) {
+                       currentFuture = AVAILABLE;
+                       current.complete(null);
+               }
        }
 
        /**
+        * Makes sure the availability future is incomplete, if it was complete 
before.
+        */
+       @GuardedBy("lock")
+       private void moveToUnAvailable() {
+               if (currentFuture == AVAILABLE) {
+                       currentFuture = new CompletableFuture<>();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Blocking Queue Logic
+       // 
------------------------------------------------------------------------
+
+       /**
         * Put an element into the queue. The thread blocks if the queue is 
full.
         *
         * @param threadIndex the index of the thread.
@@ -101,25 +213,40 @@ public class FutureCompletingBlockingQueue<T> {
        }
 
        /**
-        * Get and remove the first element from the queue. The call blocks if 
the queue is empty.
+        * <b>Warning:</b> This is a dangerous method and should only be used 
for testing convenience.
+        * A method that blocks until availability does not go together well 
with the concept of
+        * asynchronous notifications and non-blocking polling.
+        *
+        * <p>Get and remove the first element from the queue. The call blocks 
if the queue is empty.
+        * The problem with this method is that it may loop internally until an 
element is available and
+        * that way eagerly reset the availability future. If a consumer thread 
is blocked in taking an
+        * element, it will receive availability notifications from {@link 
#notifyAvailable()} and immediately
+        * reset them by calling {@link #poll()} and finding the queue empty.
         *
         * @return the first element in the queue.
         * @throws InterruptedException when the thread is interrupted.
         */
-       public T take() throws InterruptedException{
-               lock.lock();
-               try {
-                       while (queue.size() == 0) {
-                               notEmpty.await();
+       @VisibleForTesting
+       public T take() throws InterruptedException {
+               T next;
+               while ((next = poll()) == null) {
+                       // use the future to wait for availability to avoid 
busy waiting
+                       try {
+                               getAvailabilityFuture().get();
+                       } catch (ExecutionException | CompletionException e) {
+                               // this should never happen, but we propagate 
just in case
+                               throw new FlinkRuntimeException("exception in 
queue future completion", e);
                        }
-                       return dequeue();
-               } finally {
-                       lock.unlock();
                }
+               return next;
        }
 
        /**
-        * Get and remove the first element from the queue. Null is retuned if 
the queue is empty.
+        * Get and remove the first element from the queue. Null is returned if 
the queue is empty.
+        * If this makes the queue empty (takes the last element) or finds the 
queue already empty,
+        * then this resets the availability notifications. The next call to 
{@link #getAvailabilityFuture()}
+        * will then return a non-complete future that completes only the next 
time that the queue
+        * becomes non-empty or the {@link #notifyAvailable()} method is called.
         *
         * @return the first element from the queue, or Null if the queue is 
empty.
         */
@@ -127,6 +254,7 @@ public class FutureCompletingBlockingQueue<T> {
                lock.lock();
                try {
                        if (queue.size() == 0) {
+                               moveToUnAvailable();
                                return null;
                        }
                        return dequeue();
@@ -149,6 +277,9 @@ public class FutureCompletingBlockingQueue<T> {
                }
        }
 
+       /**
+        * Gets the size of the queue.
+        */
        public int size() {
                lock.lock();
                try {
@@ -158,6 +289,9 @@ public class FutureCompletingBlockingQueue<T> {
                }
        }
 
+       /**
+        * Checks whether the queue is empty.
+        */
        public boolean isEmpty() {
                lock.lock();
                try {
@@ -167,6 +301,10 @@ public class FutureCompletingBlockingQueue<T> {
                }
        }
 
+       /**
+        * Checks the remaining capacity in the queue. That is the difference 
between the maximum capacity
+        * and the current number of elements in the queue.
+        */
        public int remainingCapacity() {
                lock.lock();
                try {
@@ -176,6 +314,16 @@ public class FutureCompletingBlockingQueue<T> {
                }
        }
 
+       /**
+        * Gracefully wakes up the thread with the given {@code threadIndex} if 
it is blocked in
+        * adding an element. to the queue. If the thread is blocked in {@link 
#put(int, Object)} it will
+        * immediately return from the method with a return value of false.
+        *
+        * <p>If this method is called, the next time the thread with the given 
index is about to be blocked
+        * in adding an element, it may immediately wake up and return.
+        *
+        * @param threadIndex The number identifying the thread.
+        */
        public void wakeUpPuttingThread(int threadIndex) {
                lock.lock();
                try {
@@ -190,36 +338,34 @@ public class FutureCompletingBlockingQueue<T> {
                }
        }
 
-       public void notifyAvailable() {
-               futureNotifier.notifyComplete();
-       }
-
        // --------------- private helpers -------------------------
 
+       @GuardedBy("lock")
        private void enqueue(T element) {
-               int sizeBefore = queue.size();
+               final int sizeBefore = queue.size();
                queue.add(element);
-               futureNotifier.notifyComplete();
                if (sizeBefore == 0) {
-                       notEmpty.signal();
+                       moveToAvailable();
                }
                if (sizeBefore < capacity - 1 && !notFull.isEmpty()) {
                        signalNextPutter();
                }
        }
 
+       @GuardedBy("lock")
        private T dequeue() {
-               int sizeBefore = queue.size();
-               T element = queue.poll();
+               final int sizeBefore = queue.size();
+               final T element = queue.poll();
                if (sizeBefore == capacity && !notFull.isEmpty()) {
                        signalNextPutter();
                }
-               if (sizeBefore > 1) {
-                       notEmpty.signal();
+               if (queue.isEmpty()) {
+                       moveToUnAvailable();
                }
                return element;
        }
 
+       @GuardedBy("lock")
        private void waitOnPut(int fetcherIndex) throws InterruptedException {
                maybeCreateCondition(fetcherIndex);
                Condition cond = putConditionAndFlags[fetcherIndex].condition();
@@ -227,12 +373,14 @@ public class FutureCompletingBlockingQueue<T> {
                cond.await();
        }
 
+       @GuardedBy("lock")
        private void signalNextPutter() {
                if (!notFull.isEmpty()) {
                        notFull.poll().signal();
                }
        }
 
+       @GuardedBy("lock")
        private void maybeCreateCondition(int threadIndex) {
                if (putConditionAndFlags.length < threadIndex + 1) {
                        putConditionAndFlags = 
Arrays.copyOf(putConditionAndFlags, threadIndex + 1);
@@ -243,6 +391,7 @@ public class FutureCompletingBlockingQueue<T> {
                }
        }
 
+       @GuardedBy("lock")
        private boolean getAndResetWakeUpFlag(int threadIndex) {
                maybeCreateCondition(threadIndex);
                if (putConditionAndFlags[threadIndex].getWakeUp()) {
@@ -275,4 +424,22 @@ public class FutureCompletingBlockingQueue<T> {
                        wakeUp = value;
                }
        }
+
+       // 
------------------------------------------------------------------------
+       //  utilities
+       // 
------------------------------------------------------------------------
+
+       @SuppressWarnings("unchecked")
+       private static CompletableFuture<Void> getAvailableFuture() {
+               // this is a way to obtain the AvailabilityProvider.AVAILABLE 
future until we decide to
+               // move the class from the runtime module to the core module
+               try {
+                       final Class<?> clazz = 
Class.forName("org.apache.flink.runtime.io.AvailabilityProvider");
+                       final Field field = clazz.getDeclaredField("AVAILABLE");
+                       return (CompletableFuture<Void>) field.get(null);
+               }
+               catch (Throwable t) {
+                       return CompletableFuture.completedFuture(null);
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java
deleted file mode 100644
index 9330407..0000000
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.source.reader.synchronization;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * A class facilitating the asynchronous communication among threads.
- */
-public class FutureNotifier {
-       /** A future reference. */
-       private final AtomicReference<CompletableFuture<Void>> futureRef;
-
-       public FutureNotifier() {
-               this.futureRef = new AtomicReference<>(null);
-       }
-
-       /**
-        * Get the future out of this notifier. The future will be completed 
when someone invokes
-        * {@link #notifyComplete()}. If there is already an uncompleted 
future, that existing
-        * future will be returned instead of a new one.
-        *
-        * @return a future that will be completed when {@link 
#notifyComplete()} is invoked.
-        */
-       public CompletableFuture<Void> future() {
-               CompletableFuture<Void> prevFuture = futureRef.get();
-               if (prevFuture != null) {
-                       // Someone has created a future for us, don't create a 
new one.
-                       return prevFuture;
-               } else {
-                       CompletableFuture<Void> newFuture = new 
CompletableFuture<>();
-                       boolean newFutureSet = futureRef.compareAndSet(null, 
newFuture);
-                       // If someone created a future after our previous 
check, use that future.
-                       // Otherwise, use the new future.
-                       return newFutureSet ? newFuture : future();
-               }
-       }
-
-       /**
-        * Complete the future if there is one. This will release the thread 
that is waiting for data.
-        */
-       public void notifyComplete() {
-               CompletableFuture<Void> future = futureRef.get();
-               // If there are multiple threads trying to complete the future, 
only the first one succeeds.
-               if (future != null && future.complete(null)) {
-                       futureRef.compareAndSet(future, null);
-               }
-       }
-}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 0ec4297..84eeb4e 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -33,7 +33,6 @@ import 
org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -62,12 +61,10 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                expectedException.expectMessage("One or more fetchers have 
encountered exception");
                final String errMsg = "Testing Exception";
 
-               FutureNotifier futureNotifier = new FutureNotifier();
                FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
-                       new FutureCompletingBlockingQueue<>(futureNotifier);
+                       new FutureCompletingBlockingQueue<>();
                // We have to handle split changes first, otherwise fetch will 
not be called.
                try (MockSourceReader reader = new MockSourceReader(
-                       futureNotifier,
                        elementsQueue,
                        () -> new SplitReader<int[], MockSourceSplit>() {
                                @Override
@@ -127,13 +124,11 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
 
        @Override
        protected MockSourceReader createReader() {
-               FutureNotifier futureNotifier = new FutureNotifier();
                FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
-                       new FutureCompletingBlockingQueue<>(futureNotifier);
+                       new FutureCompletingBlockingQueue<>();
                MockSplitReader mockSplitReader =
                        new MockSplitReader(2, true, true);
                return new MockSourceReader(
-                       futureNotifier,
                        elementsQueue,
                        () -> mockSplitReader,
                        getConfig(),
@@ -183,12 +178,10 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                final String splitId,
                final RecordsWithSplitIds<E> records) throws Exception {
 
-               final FutureNotifier futureNotifier = new FutureNotifier();
                final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue =
-                       new FutureCompletingBlockingQueue<>(futureNotifier);
+                       new FutureCompletingBlockingQueue<>();
 
                final SourceReader<E, TestingSourceSplit> reader = new 
SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, 
TestingSourceSplit>(
-                       futureNotifier,
                        elementsQueue,
                        () -> new TestingSplitReader<E, 
TestingSourceSplit>(records),
                        new PassThroughRecordEmitter<E, TestingSourceSplit>(),
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 6e27d95..c25490b 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
 import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 import org.apache.flink.core.testutils.CheckedThread;
 
 import org.junit.Test;
@@ -84,28 +83,28 @@ public class SplitFetcherTest {
 
        @Test
        public void testNotifiesWhenGoingIdle() {
-               final FutureNotifier notifier = new FutureNotifier();
+               final 
FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new 
FutureCompletingBlockingQueue<>();
                final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
                        "test-split",
-                       new FutureCompletingBlockingQueue<>(notifier),
+                       queue,
                        new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
 
                fetcher.runOnce();
 
                assertTrue(fetcher.assignedSplits().isEmpty());
                assertTrue(fetcher.isIdle());
-               assertTrue(notifier.future().isDone());
+               assertTrue(queue.getAvailabilityFuture().isDone());
        }
 
        @Test
        public void testNotifiesOlderFutureWhenGoingIdle() {
-               final FutureNotifier notifier = new FutureNotifier();
+               final 
FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new 
FutureCompletingBlockingQueue<>();
                final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
-                       "test-split",
-                       new FutureCompletingBlockingQueue<>(notifier),
-                       new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
+                               "test-split",
+                               queue,
+                               new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
 
-               final CompletableFuture<?> future = notifier.future();
+               final CompletableFuture<?> future = 
queue.getAvailabilityFuture();
 
                fetcher.runOnce();
 
@@ -116,9 +115,8 @@ public class SplitFetcherTest {
 
        @Test
        public void testNotifiesWhenGoingIdleConcurrent() throws Exception {
-               final FutureNotifier notifier = new FutureNotifier();
                final 
FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
-                               new FutureCompletingBlockingQueue<>(notifier);
+                               new FutureCompletingBlockingQueue<>();
                final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
                        "test-split", queue, new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
 
@@ -128,7 +126,7 @@ public class SplitFetcherTest {
                try {
                        fetcher.runOnce();
 
-                       assertTrue(notifier.future().isDone());
+                       assertTrue(queue.getAvailabilityFuture().isDone());
                } finally {
                        queueDrainer.shutdown();
                }
@@ -136,16 +134,15 @@ public class SplitFetcherTest {
 
        @Test
        public void testNotifiesOlderFutureWhenGoingIdleConcurrent() throws 
Exception {
-               final FutureNotifier notifier = new FutureNotifier();
                final 
FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
-                       new FutureCompletingBlockingQueue<>(notifier);
+                               new FutureCompletingBlockingQueue<>();
                final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
-                       "test-split", queue, new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
+                               "test-split", queue, new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
 
                final QueueDrainerThread queueDrainer = new 
QueueDrainerThread(queue);
                queueDrainer.start();
 
-               final CompletableFuture<?> future = notifier.future();
+               final CompletableFuture<?> future = 
queue.getAvailabilityFuture();
 
                try {
                        fetcher.runOnce();
@@ -164,7 +161,7 @@ public class SplitFetcherTest {
                final int numTotalRecords = numRecordsPerSplit * numSplits;
 
                FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementQueue =
-                       new FutureCompletingBlockingQueue<>(new 
FutureNotifier(), 1);
+                       new FutureCompletingBlockingQueue<>(1);
                SplitFetcher<int[], MockSourceSplit> fetcher =
                                new SplitFetcher<>(
                                                0,
@@ -243,7 +240,7 @@ public class SplitFetcherTest {
 
        private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
                        final SplitReader<E, TestingSourceSplit> reader) {
-               return createFetcher(reader, new 
FutureCompletingBlockingQueue<>(new FutureNotifier()));
+               return createFetcher(reader, new 
FutureCompletingBlockingQueue<>());
        }
 
        private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
@@ -255,7 +252,7 @@ public class SplitFetcherTest {
        private static <E> SplitFetcher<E, TestingSourceSplit> 
createFetcherWithSplit(
                        final String splitId,
                        final SplitReader<E, TestingSourceSplit> reader) {
-               return createFetcherWithSplit(splitId, new 
FutureCompletingBlockingQueue<>(new FutureNotifier()), reader);
+               return createFetcherWithSplit(splitId, new 
FutureCompletingBlockingQueue<>(), reader);
        }
 
        private static <E> SplitFetcher<E, TestingSourceSplit> 
createFetcherWithSplit(
@@ -292,6 +289,7 @@ public class SplitFetcherTest {
                                        queue.take();
                                }
                                catch (InterruptedException ignored) {
+                                       Thread.currentThread().interrupt();
                                        // fall through the loop
                                }
                        }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
index ae46286..2681e5a 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
@@ -30,7 +30,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -68,15 +67,13 @@ public class MockBaseSource implements Source<Integer, 
MockSourceSplit, List<Moc
 
        @Override
        public SourceReader<Integer, MockSourceSplit> 
createReader(SourceReaderContext readerContext) {
-               FutureNotifier futureNotifier = new FutureNotifier();
                FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
-                               new 
FutureCompletingBlockingQueue<>(futureNotifier);
+                               new FutureCompletingBlockingQueue<>();
 
                Configuration config = new Configuration();
                config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 
1);
                config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 
30000L);
                return new MockSourceReader(
-                               futureNotifier,
                                elementsQueue,
                                () -> new MockSplitReader(2, true, true),
                                config,
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
index 92a19ef..66022db 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -37,12 +36,11 @@ import java.util.function.Supplier;
 public class MockSourceReader
                extends SingleThreadMultiplexSourceReaderBase<int[], Integer, 
MockSourceSplit, AtomicInteger> {
 
-       public MockSourceReader(FutureNotifier futureNotifier,
-                                                       
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue,
+       public 
MockSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue,
                                                        
Supplier<SplitReader<int[], MockSourceSplit>> splitFetcherSupplier,
                                                        Configuration config,
                                                        SourceReaderContext 
context) {
-               super(futureNotifier, elementsQueue, splitFetcherSupplier, new 
MockRecordEmitter(), config, context);
+               super(elementsQueue, splitFetcherSupplier, new 
MockRecordEmitter(), config, context);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
index c1bde50..ef056d9e 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.base.source.reader.synchronization;
 
+import org.apache.flink.runtime.io.AvailabilityProvider;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -30,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -42,10 +45,9 @@ public class FutureCompletingBlockingQueueTest {
 
        @Test
        public void testBasics() throws InterruptedException {
-               FutureNotifier futureNotifier = new FutureNotifier();
-               FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>(futureNotifier, 5);
+               FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>(5);
 
-               CompletableFuture<Void> future = futureNotifier.future();
+               CompletableFuture<Void> future = queue.getAvailabilityFuture();
                assertTrue(queue.isEmpty());
                assertEquals(0, queue.size());
 
@@ -66,8 +68,7 @@ public class FutureCompletingBlockingQueueTest {
 
        @Test
        public void testPoll() throws InterruptedException {
-               FutureNotifier futureNotifier = new FutureNotifier();
-               FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>(futureNotifier);
+               FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>();
                queue.put(0, 1234);
                Integer value = queue.poll();
                assertNotNull(value);
@@ -76,8 +77,7 @@ public class FutureCompletingBlockingQueueTest {
 
        @Test
        public void testWakeUpPut() throws InterruptedException {
-               FutureNotifier futureNotifier = new FutureNotifier();
-               FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>(futureNotifier, 1);
+               FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>(1);
 
                CountDownLatch latch = new CountDownLatch(1);
                new Thread(() -> {
@@ -97,8 +97,7 @@ public class FutureCompletingBlockingQueueTest {
 
        @Test
        public void testConcurrency() throws InterruptedException {
-               FutureNotifier futureNotifier = new FutureNotifier();
-               FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>(futureNotifier, 5);
+               FutureCompletingBlockingQueue<Integer> queue = new 
FutureCompletingBlockingQueue<>(5);
                final int numValuesPerThread = 10000;
                final int numPuttingThreads = 5;
                List<Thread> threads = new ArrayList<>();
@@ -146,12 +145,21 @@ public class FutureCompletingBlockingQueueTest {
 
        @Test
        public void testFutureCompletingBlockingQueueConstructor() {
-               FutureNotifier notifier = new FutureNotifier();
-               FutureCompletingBlockingQueue<Object> 
defaultCapacityFutureCompletingBlockingQueue = new 
FutureCompletingBlockingQueue<>(notifier);
-               FutureCompletingBlockingQueue<Object> 
specifiedCapacityFutureCompletingBlockingQueue = new 
FutureCompletingBlockingQueue<>(notifier, SPECIFIED_CAPACITY);
+               FutureCompletingBlockingQueue<Object> 
defaultCapacityFutureCompletingBlockingQueue = new 
FutureCompletingBlockingQueue<>();
+               FutureCompletingBlockingQueue<Object> 
specifiedCapacityFutureCompletingBlockingQueue = new 
FutureCompletingBlockingQueue<>(SPECIFIED_CAPACITY);
                // The capacity of the queue needs to be equal to 10000
                
assertEquals(defaultCapacityFutureCompletingBlockingQueue.remainingCapacity(), 
(int) DEFAULT_CAPACITY);
                // The capacity of the queue needs to be equal to 
SPECIFIED_CAPACITY
                
assertEquals(specifiedCapacityFutureCompletingBlockingQueue.remainingCapacity(),
 (int) SPECIFIED_CAPACITY);
        }
+
+       /**
+        * This test is to guard that our reflection is not broken and we do 
not lose the
+        * performance advantage. This is possible, because the tests depend on 
the runtime modules
+        * while the main scope does not.
+        */
+       @Test
+       public void testQueueUsesShortCircuitFuture() {
+               assertSame(AvailabilityProvider.AVAILABLE, 
FutureCompletingBlockingQueue.AVAILABLE);
+       }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java
deleted file mode 100644
index b257ebf..0000000
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.source.reader.synchronization;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * The unit tests for {@link FutureNotifier}.
- */
-public class FutureNotifierTest {
-
-       @Test
-       public void testGetFuture() {
-               FutureNotifier notifier = new FutureNotifier();
-               CompletableFuture<Void> future = notifier.future();
-               // The future should not be null.
-               assertNotNull(future);
-               // Calling the future again should return the same future.
-               assertEquals(future, notifier.future());
-       }
-
-       @Test
-       public void testCompleteFuture() {
-               FutureNotifier notifier = new FutureNotifier();
-               CompletableFuture<Void> future = notifier.future();
-               assertFalse(future.isDone());
-               notifier.notifyComplete();
-               assertTrue(future.isDone());
-       }
-
-       @Test
-       public void testConcurrency() throws InterruptedException, 
ExecutionException {
-               final int times = 1_000_000;
-               final int nThreads = 5;
-               FutureNotifier notifier = new FutureNotifier();
-               // A thread pool that simply gets futures out of the notifier.
-               ExecutorService listenerExecutor = 
Executors.newFixedThreadPool(nThreads);
-               // A thread pool that completes the futures.
-               ExecutorService notifierExecutor = 
Executors.newFixedThreadPool(nThreads);
-
-               CountDownLatch runningListeners = new CountDownLatch(nThreads);
-               CountDownLatch startCommand = new CountDownLatch(1);
-               CountDownLatch finishLine = new CountDownLatch(1);
-
-               List<Future<?>> executionFutures = new ArrayList<>();
-               // Start nThreads thread getting futures out of the notifier.
-               for (int i = 0; i < nThreads; i++) {
-                       executionFutures.add(listenerExecutor.submit(() -> {
-                               try {
-                                       List<CompletableFuture<Void>> futures = 
new ArrayList<>(times);
-                                       startCommand.await();
-                                       for (int j = 0; j < times; j++) {
-                                               futures.add(notifier.future());
-                                       }
-                                       runningListeners.countDown();
-                                       // Wait for the notifying thread to 
finish.
-                                       finishLine.await();
-                                       // All the futures should have been 
completed.
-                                       futures.forEach(f -> {
-                                               assertNotNull(f);
-                                               assertTrue(f.isDone());
-                                       });
-                               } catch (Exception e) {
-                                       fail();
-                               }
-                       }));
-               }
-
-               // Start nThreads thread notifying the completion.
-               for (int i = 0; i < nThreads; i++) {
-                       notifierExecutor.submit(() -> {
-                               try {
-                                       startCommand.await();
-                                       while (runningListeners.getCount() > 0) 
{
-                                               notifier.notifyComplete();
-                                       }
-                                       notifier.notifyComplete();
-                                       finishLine.countDown();
-                               } catch (Exception e) {
-                                       fail();
-                               }
-                       });
-               }
-
-               // Kick off the threads.
-               startCommand.countDown();
-
-               try {
-                       for (Future<?> executionFuture : executionFutures) {
-                               executionFuture.get();
-                       }
-               } finally {
-                       listenerExecutor.shutdown();
-                       notifierExecutor.shutdown();
-                       listenerExecutor.awaitTermination(30L, 
TimeUnit.SECONDS);
-                       notifierExecutor.awaitTermination(30L, 
TimeUnit.SECONDS);
-               }
-       }
-}

Reply via email to