This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0e821eaa483b0371ac1df1339f0d3c9ad7376976 Author: Stephan Ewen <[email protected]> AuthorDate: Mon Sep 14 23:48:01 2020 +0200 [FLINK-19223][connectors] Simplify Availability Future Model in Base Connector This implements a model closer to the AvailabilityListener and AvailabilityHelper in the flink-runtime. This closes #13385 --- .../SingleThreadMultiplexSourceReaderBase.java | 5 +- .../base/source/reader/SourceReaderBase.java | 21 +- .../reader/fetcher/SingleThreadFetcherManager.java | 4 +- .../source/reader/fetcher/SplitFetcherManager.java | 5 +- .../FutureCompletingBlockingQueue.java | 259 +++++++++++++++++---- .../reader/synchronization/FutureNotifier.java | 66 ------ .../base/source/reader/SourceReaderBaseTest.java | 13 +- .../source/reader/fetcher/SplitFetcherTest.java | 36 ++- .../base/source/reader/mocks/MockBaseSource.java | 5 +- .../base/source/reader/mocks/MockSourceReader.java | 6 +- .../FutureCompletingBlockingQueueTest.java | 32 ++- .../reader/synchronization/FutureNotifierTest.java | 131 ----------- 12 files changed, 261 insertions(+), 322 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java index 3239f28..ab87db0 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier; import java.util.function.Supplier; @@ -40,16 +39,14 @@ public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends extends SourceReaderBase<E, T, SplitT, SplitStateT> { public SingleThreadMultiplexSourceReaderBase( - FutureNotifier futureNotifier, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { super( - futureNotifier, elementsQueue, - new SingleThreadFetcherManager<>(futureNotifier, elementsQueue, splitReaderSupplier), + new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier), recordEmitter, config, context); diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 0305e2d..fb4e6df9 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -29,7 +29,6 @@ import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent; import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier; import org.apache.flink.core.io.InputStatus; import org.slf4j.Logger; @@ -61,9 +60,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt implements SourceReader<T, SplitT> { private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBase.class); - /** A future notifier to notify when this reader requires attention. */ - private final FutureNotifier futureNotifier; - /** A queue to buffer the elements fetched by the fetcher thread. */ private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue; @@ -94,13 +90,11 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt private boolean noMoreSplitsAssignment; public SourceReaderBase( - FutureNotifier futureNotifier, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { - this.futureNotifier = futureNotifier; this.elementsQueue = elementsQueue; this.splitFetcherManager = splitFetcherManager; this.recordEmitter = recordEmitter; @@ -203,18 +197,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt @Override public CompletableFuture<Void> isAvailable() { - // The order matters here. We first get the future. After this point, if the queue - // is empty or there is no error in the split fetcher manager, we can ensure that - // the future will be completed by the fetcher once it put an element into the element queue, - // or it will be completed when an error occurs. - CompletableFuture<Void> future = futureNotifier.future(); - splitFetcherManager.checkErrors(); - if (!elementsQueue.isEmpty()) { - // The fetcher got the new elements after the last poll, or their is a finished split. - // Simply complete the future and return; - futureNotifier.notifyComplete(); - } - return future; + return currentFetch != null ? FutureCompletingBlockingQueue.AVAILABLE : elementsQueue.getAvailabilityFuture(); } @Override @@ -239,7 +222,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt if (sourceEvent instanceof NoMoreSplitsEvent) { LOG.info("Reader received NoMoreSplits event."); noMoreSplitsAssignment = true; - futureNotifier.notifyComplete(); + elementsQueue.notifyAvailable(); } } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java index bd5879f..339c533 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java @@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier; import java.util.List; import java.util.function.Supplier; @@ -34,10 +33,9 @@ public class SingleThreadFetcherManager<E, SplitT extends SourceSplit> extends SplitFetcherManager<E, SplitT> { public SingleThreadFetcherManager( - FutureNotifier futureNotifier, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier) { - super(futureNotifier, elementsQueue, splitReaderSupplier); + super(elementsQueue, splitReaderSupplier); } @Override diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java index 26d92e3..ffac523 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java @@ -23,7 +23,6 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SourceReaderBase; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier; import org.apache.flink.util.ThrowableCatchingRunnable; import org.slf4j.Logger; @@ -79,12 +78,10 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { /** * Create a split fetcher manager. * - * @param futureNotifier a notifier to notify the complete of a future. * @param elementsQueue the queue that split readers will put elements into. * @param splitReaderFactory a supplier that could be used to create split readers. */ public SplitFetcherManager( - FutureNotifier futureNotifier, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderFactory) { this.elementsQueue = elementsQueue; @@ -96,7 +93,7 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { // Add the exception to the exception list. uncaughtFetcherException.get().addSuppressed(t); // Wake up the main thread to let it know the exception. - futureNotifier.notifyComplete(); + elementsQueue.notifyAvailable(); } } }; diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java index dcbb66e..c89b682 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java @@ -18,62 +18,174 @@ package org.apache.flink.connector.base.source.reader.synchronization; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.concurrent.GuardedBy; + +import java.lang.reflect.Field; import java.util.ArrayDeque; import java.util.Arrays; import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import static org.apache.flink.util.Preconditions.checkArgument; + /** - * A custom implementation of blocking queue with the following features. - * <ul> - * <li> - * It allows a consuming thread to be notified asynchronously on element availability when the - * queue is empty. - * </li> - * <li> - * Allows the putting threads to be gracefully waken up without interruption. - * </li> - * </ul> + * A custom implementation of blocking queue in combination with a {@link CompletableFuture} that is + * used in the hand-over of data from a producing thread to a consuming thread. + * This FutureCompletingBlockingQueue has the following features: + * + * <h3>Consumer Notifications</h3> + * + * <p>Rather than letting consumers block on the {@link #take()} method, or have them poll the + * {@link #poll()} method, this queue offers a {@link CompletableFuture}, obtained via the + * {@link #getAvailabilityFuture()} method) that gets completed whenever the queue is non-empty. + * A consumer can thus subscribe to asynchronous notifications for availability by adding a handler + * to the obtained {@code CompletableFuture}. + * + * <p>The future may also be completed by an explicit call to {@link #notifyAvailable()}. That way the + * consumer may be notified of a situation/condition without adding an element to the queue. + * + * <p>Availability is reset when a call to {@link #poll()} (or {@link #take()} finds an empty queue + * or results in an empty queue (takes the last element). + * + * <p>Note that this model generally assumes that <i>false positives</i> are okay, meaning that the + * availability future completes despite there being no data availabile in the queue. The consumer is + * responsible for polling data and obtaining another future to wait on. This is similar to the way + * that Java's Monitors and Conditions can have the <i>spurious wakeup</i> of the waiting threads + * and commonly need to be used in loop with the waiting condition. + * + * <h3>Producer Wakeup</h3> + * + * <p>The queue supports gracefully waking up producing threads that are blocked due to the queue + * capacity limits, without interrupting the thread. This is done via the {@link #wakeUpPuttingThread(int)} + * method. * * @param <T> the type of the elements in the queue. */ public class FutureCompletingBlockingQueue<T> { + + /** + * A constant future that is complete, indicating availability. Using this constant in cases that + * are guaranteed available helps short-circuiting some checks and avoiding volatile memory operations. + */ + public static final CompletableFuture<Void> AVAILABLE = getAvailableFuture(); + + /** + * The default capacity for the queue. + */ + private static final int DEFAULT_CAPACITY = 1; + + // ------------------------------------------------------------------------ + + /** The maximum capacity of the queue. */ private final int capacity; - private final FutureNotifier futureNotifier; - /** The element queue. */ - private final Queue<T> queue; + /** The availability future. This doubles as a "non empty" condition. This value is never null.*/ + private CompletableFuture<Void> currentFuture; + /** The lock for synchronization. */ private final Lock lock; + + /** The element queue. */ + @GuardedBy("lock") + private final Queue<T> queue; + /** The per-thread conditions that are waiting on putting elements. */ + @GuardedBy("lock") private final Queue<Condition> notFull; - /** The shared conditions for getting elements. */ - private final Condition notEmpty; + /** The per-thread conditions and wakeUp flags. */ + @GuardedBy("lock") private ConditionAndFlag[] putConditionAndFlags; - /** - * The default capacity for the queue. - */ - private static final Integer DEFAULT_CAPACITY = 1; - - public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) { - this(futureNotifier, DEFAULT_CAPACITY); + public FutureCompletingBlockingQueue() { + this(DEFAULT_CAPACITY); } - public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int capacity) { + public FutureCompletingBlockingQueue(int capacity) { + checkArgument(capacity > 0, "capacity must be > 0"); this.capacity = capacity; - this.futureNotifier = futureNotifier; this.queue = new ArrayDeque<>(capacity); this.lock = new ReentrantLock(); this.putConditionAndFlags = new ConditionAndFlag[1]; this.notFull = new ArrayDeque<>(); - this.notEmpty = lock.newCondition(); + + // initially the queue is empty and thus unavailable + this.currentFuture = new CompletableFuture<>(); + } + + // ------------------------------------------------------------------------ + // Future / Notification logic + // ------------------------------------------------------------------------ + + /** + * Returns the availability future. If the queue is non-empty, then this future will already + * be complete. Otherwise the obtained future is guaranteed to get completed the next time + * the queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}. + * + * <p>It is important that a completed future is no guarantee that the next call to + * {@link #poll()} will return a non-null element. If there are concurrent consumer, another + * consumer may have taken the available element. Or there was no element in the first place, + * because the future was completed through a call to {@link #notifyAvailable()}. + * + * <p>For that reason, it is important to call this method (to obtain a new future) every + * time again after {@link #poll()} returned null and you want to wait for data. + */ + public CompletableFuture<Void> getAvailabilityFuture() { + return currentFuture; + } + + /** + * Makes sure the availability future is complete, if it is not complete already. + * All futures returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to + * be completed. + * + * <p>All future calls to the method will return a completed future, until the point + * that the availability is reset via calls to {@link #poll()} that leave the queue empty. + */ + public void notifyAvailable() { + lock.lock(); + try { + moveToAvailable(); + } finally { + lock.unlock(); + } + } + + /** + * Internal utility to make sure that the current future futures are complete (until reset). + */ + @GuardedBy("lock") + private void moveToAvailable() { + final CompletableFuture<Void> current = currentFuture; + if (current != AVAILABLE) { + currentFuture = AVAILABLE; + current.complete(null); + } } /** + * Makes sure the availability future is incomplete, if it was complete before. + */ + @GuardedBy("lock") + private void moveToUnAvailable() { + if (currentFuture == AVAILABLE) { + currentFuture = new CompletableFuture<>(); + } + } + + // ------------------------------------------------------------------------ + // Blocking Queue Logic + // ------------------------------------------------------------------------ + + /** * Put an element into the queue. The thread blocks if the queue is full. * * @param threadIndex the index of the thread. @@ -101,25 +213,40 @@ public class FutureCompletingBlockingQueue<T> { } /** - * Get and remove the first element from the queue. The call blocks if the queue is empty. + * <b>Warning:</b> This is a dangerous method and should only be used for testing convenience. + * A method that blocks until availability does not go together well with the concept of + * asynchronous notifications and non-blocking polling. + * + * <p>Get and remove the first element from the queue. The call blocks if the queue is empty. + * The problem with this method is that it may loop internally until an element is available and + * that way eagerly reset the availability future. If a consumer thread is blocked in taking an + * element, it will receive availability notifications from {@link #notifyAvailable()} and immediately + * reset them by calling {@link #poll()} and finding the queue empty. * * @return the first element in the queue. * @throws InterruptedException when the thread is interrupted. */ - public T take() throws InterruptedException{ - lock.lock(); - try { - while (queue.size() == 0) { - notEmpty.await(); + @VisibleForTesting + public T take() throws InterruptedException { + T next; + while ((next = poll()) == null) { + // use the future to wait for availability to avoid busy waiting + try { + getAvailabilityFuture().get(); + } catch (ExecutionException | CompletionException e) { + // this should never happen, but we propagate just in case + throw new FlinkRuntimeException("exception in queue future completion", e); } - return dequeue(); - } finally { - lock.unlock(); } + return next; } /** - * Get and remove the first element from the queue. Null is retuned if the queue is empty. + * Get and remove the first element from the queue. Null is returned if the queue is empty. + * If this makes the queue empty (takes the last element) or finds the queue already empty, + * then this resets the availability notifications. The next call to {@link #getAvailabilityFuture()} + * will then return a non-complete future that completes only the next time that the queue + * becomes non-empty or the {@link #notifyAvailable()} method is called. * * @return the first element from the queue, or Null if the queue is empty. */ @@ -127,6 +254,7 @@ public class FutureCompletingBlockingQueue<T> { lock.lock(); try { if (queue.size() == 0) { + moveToUnAvailable(); return null; } return dequeue(); @@ -149,6 +277,9 @@ public class FutureCompletingBlockingQueue<T> { } } + /** + * Gets the size of the queue. + */ public int size() { lock.lock(); try { @@ -158,6 +289,9 @@ public class FutureCompletingBlockingQueue<T> { } } + /** + * Checks whether the queue is empty. + */ public boolean isEmpty() { lock.lock(); try { @@ -167,6 +301,10 @@ public class FutureCompletingBlockingQueue<T> { } } + /** + * Checks the remaining capacity in the queue. That is the difference between the maximum capacity + * and the current number of elements in the queue. + */ public int remainingCapacity() { lock.lock(); try { @@ -176,6 +314,16 @@ public class FutureCompletingBlockingQueue<T> { } } + /** + * Gracefully wakes up the thread with the given {@code threadIndex} if it is blocked in + * adding an element. to the queue. If the thread is blocked in {@link #put(int, Object)} it will + * immediately return from the method with a return value of false. + * + * <p>If this method is called, the next time the thread with the given index is about to be blocked + * in adding an element, it may immediately wake up and return. + * + * @param threadIndex The number identifying the thread. + */ public void wakeUpPuttingThread(int threadIndex) { lock.lock(); try { @@ -190,36 +338,34 @@ public class FutureCompletingBlockingQueue<T> { } } - public void notifyAvailable() { - futureNotifier.notifyComplete(); - } - // --------------- private helpers ------------------------- + @GuardedBy("lock") private void enqueue(T element) { - int sizeBefore = queue.size(); + final int sizeBefore = queue.size(); queue.add(element); - futureNotifier.notifyComplete(); if (sizeBefore == 0) { - notEmpty.signal(); + moveToAvailable(); } if (sizeBefore < capacity - 1 && !notFull.isEmpty()) { signalNextPutter(); } } + @GuardedBy("lock") private T dequeue() { - int sizeBefore = queue.size(); - T element = queue.poll(); + final int sizeBefore = queue.size(); + final T element = queue.poll(); if (sizeBefore == capacity && !notFull.isEmpty()) { signalNextPutter(); } - if (sizeBefore > 1) { - notEmpty.signal(); + if (queue.isEmpty()) { + moveToUnAvailable(); } return element; } + @GuardedBy("lock") private void waitOnPut(int fetcherIndex) throws InterruptedException { maybeCreateCondition(fetcherIndex); Condition cond = putConditionAndFlags[fetcherIndex].condition(); @@ -227,12 +373,14 @@ public class FutureCompletingBlockingQueue<T> { cond.await(); } + @GuardedBy("lock") private void signalNextPutter() { if (!notFull.isEmpty()) { notFull.poll().signal(); } } + @GuardedBy("lock") private void maybeCreateCondition(int threadIndex) { if (putConditionAndFlags.length < threadIndex + 1) { putConditionAndFlags = Arrays.copyOf(putConditionAndFlags, threadIndex + 1); @@ -243,6 +391,7 @@ public class FutureCompletingBlockingQueue<T> { } } + @GuardedBy("lock") private boolean getAndResetWakeUpFlag(int threadIndex) { maybeCreateCondition(threadIndex); if (putConditionAndFlags[threadIndex].getWakeUp()) { @@ -275,4 +424,22 @@ public class FutureCompletingBlockingQueue<T> { wakeUp = value; } } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + @SuppressWarnings("unchecked") + private static CompletableFuture<Void> getAvailableFuture() { + // this is a way to obtain the AvailabilityProvider.AVAILABLE future until we decide to + // move the class from the runtime module to the core module + try { + final Class<?> clazz = Class.forName("org.apache.flink.runtime.io.AvailabilityProvider"); + final Field field = clazz.getDeclaredField("AVAILABLE"); + return (CompletableFuture<Void>) field.get(null); + } + catch (Throwable t) { + return CompletableFuture.completedFuture(null); + } + } } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java deleted file mode 100644 index 9330407..0000000 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.base.source.reader.synchronization; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; - -/** - * A class facilitating the asynchronous communication among threads. - */ -public class FutureNotifier { - /** A future reference. */ - private final AtomicReference<CompletableFuture<Void>> futureRef; - - public FutureNotifier() { - this.futureRef = new AtomicReference<>(null); - } - - /** - * Get the future out of this notifier. The future will be completed when someone invokes - * {@link #notifyComplete()}. If there is already an uncompleted future, that existing - * future will be returned instead of a new one. - * - * @return a future that will be completed when {@link #notifyComplete()} is invoked. - */ - public CompletableFuture<Void> future() { - CompletableFuture<Void> prevFuture = futureRef.get(); - if (prevFuture != null) { - // Someone has created a future for us, don't create a new one. - return prevFuture; - } else { - CompletableFuture<Void> newFuture = new CompletableFuture<>(); - boolean newFutureSet = futureRef.compareAndSet(null, newFuture); - // If someone created a future after our previous check, use that future. - // Otherwise, use the new future. - return newFutureSet ? newFuture : future(); - } - } - - /** - * Complete the future if there is one. This will release the thread that is waiting for data. - */ - public void notifyComplete() { - CompletableFuture<Void> future = futureRef.get(); - // If there are multiple threads trying to complete the future, only the first one succeeds. - if (future != null && future.complete(null)) { - futureRef.compareAndSet(future, null); - } - } -} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index 0ec4297..84eeb4e 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -33,7 +33,6 @@ import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier; import org.junit.Rule; import org.junit.Test; @@ -62,12 +61,10 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> expectedException.expectMessage("One or more fetchers have encountered exception"); final String errMsg = "Testing Exception"; - FutureNotifier futureNotifier = new FutureNotifier(); FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = - new FutureCompletingBlockingQueue<>(futureNotifier); + new FutureCompletingBlockingQueue<>(); // We have to handle split changes first, otherwise fetch will not be called. try (MockSourceReader reader = new MockSourceReader( - futureNotifier, elementsQueue, () -> new SplitReader<int[], MockSourceSplit>() { @Override @@ -127,13 +124,11 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> @Override protected MockSourceReader createReader() { - FutureNotifier futureNotifier = new FutureNotifier(); FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = - new FutureCompletingBlockingQueue<>(futureNotifier); + new FutureCompletingBlockingQueue<>(); MockSplitReader mockSplitReader = new MockSplitReader(2, true, true); return new MockSourceReader( - futureNotifier, elementsQueue, () -> mockSplitReader, getConfig(), @@ -183,12 +178,10 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> final String splitId, final RecordsWithSplitIds<E> records) throws Exception { - final FutureNotifier futureNotifier = new FutureNotifier(); final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue = - new FutureCompletingBlockingQueue<>(futureNotifier); + new FutureCompletingBlockingQueue<>(); final SourceReader<E, TestingSourceSplit> reader = new SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, TestingSourceSplit>( - futureNotifier, elementsQueue, () -> new TestingSplitReader<E, TestingSourceSplit>(records), new PassThroughRecordEmitter<E, TestingSourceSplit>(), diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java index 6e27d95..c25490b 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java @@ -26,7 +26,6 @@ import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit; import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier; import org.apache.flink.core.testutils.CheckedThread; import org.junit.Test; @@ -84,28 +83,28 @@ public class SplitFetcherTest { @Test public void testNotifiesWhenGoingIdle() { - final FutureNotifier notifier = new FutureNotifier(); + final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new FutureCompletingBlockingQueue<>(); final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit( "test-split", - new FutureCompletingBlockingQueue<>(notifier), + queue, new TestingSplitReader<>(finishedSplitFetch("test-split"))); fetcher.runOnce(); assertTrue(fetcher.assignedSplits().isEmpty()); assertTrue(fetcher.isIdle()); - assertTrue(notifier.future().isDone()); + assertTrue(queue.getAvailabilityFuture().isDone()); } @Test public void testNotifiesOlderFutureWhenGoingIdle() { - final FutureNotifier notifier = new FutureNotifier(); + final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new FutureCompletingBlockingQueue<>(); final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit( - "test-split", - new FutureCompletingBlockingQueue<>(notifier), - new TestingSplitReader<>(finishedSplitFetch("test-split"))); + "test-split", + queue, + new TestingSplitReader<>(finishedSplitFetch("test-split"))); - final CompletableFuture<?> future = notifier.future(); + final CompletableFuture<?> future = queue.getAvailabilityFuture(); fetcher.runOnce(); @@ -116,9 +115,8 @@ public class SplitFetcherTest { @Test public void testNotifiesWhenGoingIdleConcurrent() throws Exception { - final FutureNotifier notifier = new FutureNotifier(); final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = - new FutureCompletingBlockingQueue<>(notifier); + new FutureCompletingBlockingQueue<>(); final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit( "test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split"))); @@ -128,7 +126,7 @@ public class SplitFetcherTest { try { fetcher.runOnce(); - assertTrue(notifier.future().isDone()); + assertTrue(queue.getAvailabilityFuture().isDone()); } finally { queueDrainer.shutdown(); } @@ -136,16 +134,15 @@ public class SplitFetcherTest { @Test public void testNotifiesOlderFutureWhenGoingIdleConcurrent() throws Exception { - final FutureNotifier notifier = new FutureNotifier(); final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = - new FutureCompletingBlockingQueue<>(notifier); + new FutureCompletingBlockingQueue<>(); final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit( - "test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split"))); + "test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split"))); final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue); queueDrainer.start(); - final CompletableFuture<?> future = notifier.future(); + final CompletableFuture<?> future = queue.getAvailabilityFuture(); try { fetcher.runOnce(); @@ -164,7 +161,7 @@ public class SplitFetcherTest { final int numTotalRecords = numRecordsPerSplit * numSplits; FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue = - new FutureCompletingBlockingQueue<>(new FutureNotifier(), 1); + new FutureCompletingBlockingQueue<>(1); SplitFetcher<int[], MockSourceSplit> fetcher = new SplitFetcher<>( 0, @@ -243,7 +240,7 @@ public class SplitFetcherTest { private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher( final SplitReader<E, TestingSourceSplit> reader) { - return createFetcher(reader, new FutureCompletingBlockingQueue<>(new FutureNotifier())); + return createFetcher(reader, new FutureCompletingBlockingQueue<>()); } private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher( @@ -255,7 +252,7 @@ public class SplitFetcherTest { private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit( final String splitId, final SplitReader<E, TestingSourceSplit> reader) { - return createFetcherWithSplit(splitId, new FutureCompletingBlockingQueue<>(new FutureNotifier()), reader); + return createFetcherWithSplit(splitId, new FutureCompletingBlockingQueue<>(), reader); } private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit( @@ -292,6 +289,7 @@ public class SplitFetcherTest { queue.take(); } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); // fall through the loop } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java index ae46286..2681e5a 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java @@ -30,7 +30,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SourceReaderOptions; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.util.InstantiationUtil; @@ -68,15 +67,13 @@ public class MockBaseSource implements Source<Integer, MockSourceSplit, List<Moc @Override public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) { - FutureNotifier futureNotifier = new FutureNotifier(); FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = - new FutureCompletingBlockingQueue<>(futureNotifier); + new FutureCompletingBlockingQueue<>(); Configuration config = new Configuration(); config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1); config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L); return new MockSourceReader( - futureNotifier, elementsQueue, () -> new MockSplitReader(2, true, true), config, diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java index 92a19ef..66022db 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java @@ -25,7 +25,6 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; -import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier; import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; @@ -37,12 +36,11 @@ import java.util.function.Supplier; public class MockSourceReader extends SingleThreadMultiplexSourceReaderBase<int[], Integer, MockSourceSplit, AtomicInteger> { - public MockSourceReader(FutureNotifier futureNotifier, - FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue, + public MockSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue, Supplier<SplitReader<int[], MockSourceSplit>> splitFetcherSupplier, Configuration config, SourceReaderContext context) { - super(futureNotifier, elementsQueue, splitFetcherSupplier, new MockRecordEmitter(), config, context); + super(elementsQueue, splitFetcherSupplier, new MockRecordEmitter(), config, context); } @Override diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java index c1bde50..ef056d9e 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java @@ -18,6 +18,8 @@ package org.apache.flink.connector.base.source.reader.synchronization; +import org.apache.flink.runtime.io.AvailabilityProvider; + import org.junit.Test; import java.util.ArrayList; @@ -30,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -42,10 +45,9 @@ public class FutureCompletingBlockingQueueTest { @Test public void testBasics() throws InterruptedException { - FutureNotifier futureNotifier = new FutureNotifier(); - FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5); + FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(5); - CompletableFuture<Void> future = futureNotifier.future(); + CompletableFuture<Void> future = queue.getAvailabilityFuture(); assertTrue(queue.isEmpty()); assertEquals(0, queue.size()); @@ -66,8 +68,7 @@ public class FutureCompletingBlockingQueueTest { @Test public void testPoll() throws InterruptedException { - FutureNotifier futureNotifier = new FutureNotifier(); - FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier); + FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(); queue.put(0, 1234); Integer value = queue.poll(); assertNotNull(value); @@ -76,8 +77,7 @@ public class FutureCompletingBlockingQueueTest { @Test public void testWakeUpPut() throws InterruptedException { - FutureNotifier futureNotifier = new FutureNotifier(); - FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 1); + FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(1); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { @@ -97,8 +97,7 @@ public class FutureCompletingBlockingQueueTest { @Test public void testConcurrency() throws InterruptedException { - FutureNotifier futureNotifier = new FutureNotifier(); - FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5); + FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(5); final int numValuesPerThread = 10000; final int numPuttingThreads = 5; List<Thread> threads = new ArrayList<>(); @@ -146,12 +145,21 @@ public class FutureCompletingBlockingQueueTest { @Test public void testFutureCompletingBlockingQueueConstructor() { - FutureNotifier notifier = new FutureNotifier(); - FutureCompletingBlockingQueue<Object> defaultCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier); - FutureCompletingBlockingQueue<Object> specifiedCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier, SPECIFIED_CAPACITY); + FutureCompletingBlockingQueue<Object> defaultCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(); + FutureCompletingBlockingQueue<Object> specifiedCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(SPECIFIED_CAPACITY); // The capacity of the queue needs to be equal to 10000 assertEquals(defaultCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) DEFAULT_CAPACITY); // The capacity of the queue needs to be equal to SPECIFIED_CAPACITY assertEquals(specifiedCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) SPECIFIED_CAPACITY); } + + /** + * This test is to guard that our reflection is not broken and we do not lose the + * performance advantage. This is possible, because the tests depend on the runtime modules + * while the main scope does not. + */ + @Test + public void testQueueUsesShortCircuitFuture() { + assertSame(AvailabilityProvider.AVAILABLE, FutureCompletingBlockingQueue.AVAILABLE); + } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java deleted file mode 100644 index b257ebf..0000000 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connector.base.source.reader.synchronization; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * The unit tests for {@link FutureNotifier}. - */ -public class FutureNotifierTest { - - @Test - public void testGetFuture() { - FutureNotifier notifier = new FutureNotifier(); - CompletableFuture<Void> future = notifier.future(); - // The future should not be null. - assertNotNull(future); - // Calling the future again should return the same future. - assertEquals(future, notifier.future()); - } - - @Test - public void testCompleteFuture() { - FutureNotifier notifier = new FutureNotifier(); - CompletableFuture<Void> future = notifier.future(); - assertFalse(future.isDone()); - notifier.notifyComplete(); - assertTrue(future.isDone()); - } - - @Test - public void testConcurrency() throws InterruptedException, ExecutionException { - final int times = 1_000_000; - final int nThreads = 5; - FutureNotifier notifier = new FutureNotifier(); - // A thread pool that simply gets futures out of the notifier. - ExecutorService listenerExecutor = Executors.newFixedThreadPool(nThreads); - // A thread pool that completes the futures. - ExecutorService notifierExecutor = Executors.newFixedThreadPool(nThreads); - - CountDownLatch runningListeners = new CountDownLatch(nThreads); - CountDownLatch startCommand = new CountDownLatch(1); - CountDownLatch finishLine = new CountDownLatch(1); - - List<Future<?>> executionFutures = new ArrayList<>(); - // Start nThreads thread getting futures out of the notifier. - for (int i = 0; i < nThreads; i++) { - executionFutures.add(listenerExecutor.submit(() -> { - try { - List<CompletableFuture<Void>> futures = new ArrayList<>(times); - startCommand.await(); - for (int j = 0; j < times; j++) { - futures.add(notifier.future()); - } - runningListeners.countDown(); - // Wait for the notifying thread to finish. - finishLine.await(); - // All the futures should have been completed. - futures.forEach(f -> { - assertNotNull(f); - assertTrue(f.isDone()); - }); - } catch (Exception e) { - fail(); - } - })); - } - - // Start nThreads thread notifying the completion. - for (int i = 0; i < nThreads; i++) { - notifierExecutor.submit(() -> { - try { - startCommand.await(); - while (runningListeners.getCount() > 0) { - notifier.notifyComplete(); - } - notifier.notifyComplete(); - finishLine.countDown(); - } catch (Exception e) { - fail(); - } - }); - } - - // Kick off the threads. - startCommand.countDown(); - - try { - for (Future<?> executionFuture : executionFutures) { - executionFuture.get(); - } - } finally { - listenerExecutor.shutdown(); - notifierExecutor.shutdown(); - listenerExecutor.awaitTermination(30L, TimeUnit.SECONDS); - notifierExecutor.awaitTermination(30L, TimeUnit.SECONDS); - } - } -}
