This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 997fc5f0921abf89cac6fb0ccbf0da2aaf669228 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Mon Sep 14 23:53:21 2020 +0200 [FLINK-17393][connectors] Wakeup the SplitFetchers more elegantly. This closes #13366 --- .../base/source/reader/SourceReaderBase.java | 3 +- .../base/source/reader/fetcher/FetchTask.java | 30 +-- .../base/source/reader/fetcher/SplitFetcher.java | 26 +- .../source/reader/fetcher/SplitFetcherManager.java | 3 +- .../FutureCompletingBlockingQueue.java | 268 +++++++++++++++++---- .../source/reader/fetcher/SplitFetcherTest.java | 8 +- .../source/reader/mocks/TestingSplitReader.java | 13 +- .../FutureCompletingBlockingQueueTest.java | 117 ++++++++- 8 files changed, 378 insertions(+), 90 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 02b7a7c..979afb2 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -43,7 +43,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkState; @@ -66,7 +65,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt private final FutureNotifier futureNotifier; /** A queue to buffer the elements fetched by the fetcher thread. */ - private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue; + private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue; /** The state of the splits. */ private final Map<String, SplitContext<T, SplitStateT>> splitStates; diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java index 30835ce..530add1 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java @@ -21,10 +21,10 @@ package org.apache.flink.connector.base.source.reader.fetcher; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import java.io.IOException; import java.util.Collection; -import java.util.concurrent.BlockingQueue; import java.util.function.Consumer; /** @@ -32,22 +32,22 @@ import java.util.function.Consumer; */ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask { private final SplitReader<E, SplitT> splitReader; - private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue; + private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue; private final Consumer<Collection<String>> splitFinishedCallback; - private final Thread runningThread; + private final int fetcherIndex; private volatile RecordsWithSplitIds<E> lastRecords; private volatile boolean wakeup; FetchTask( - SplitReader<E, SplitT> splitReader, - BlockingQueue<RecordsWithSplitIds<E>> elementsQueue, - Consumer<Collection<String>> splitFinishedCallback, - Thread runningThread) { + SplitReader<E, SplitT> splitReader, + FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, + Consumer<Collection<String>> splitFinishedCallback, + int fetcherIndex) { this.splitReader = splitReader; this.elementsQueue = elementsQueue; this.splitFinishedCallback = splitFinishedCallback; this.lastRecords = null; - this.runningThread = runningThread; + this.fetcherIndex = fetcherIndex; this.wakeup = false; } @@ -61,10 +61,11 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask { if (!isWakenUp()) { // The order matters here. We must first put the last records into the queue. // This ensures the handling of the fetched records is atomic to wakeup. - elementsQueue.put(lastRecords); - // The callback does not throw InterruptedException. - splitFinishedCallback.accept(lastRecords.finishedSplits()); - lastRecords = null; + if (elementsQueue.put(fetcherIndex, lastRecords)) { + // The callback does not throw InterruptedException. + splitFinishedCallback.accept(lastRecords.finishedSplits()); + lastRecords = null; + } } } finally { // clean up the potential wakeup effect. It is possible that the fetcher is waken up @@ -72,7 +73,6 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask { // running thread will be interrupted. The next invocation of run() will see that and // just skip. if (isWakenUp()) { - Thread.interrupted(); wakeup = false; } } @@ -93,12 +93,12 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask { splitReader.wakeUp(); } else { // The task might be blocking on enqueuing the records, just interrupt. - runningThread.interrupt(); + elementsQueue.wakeUpPuttingThread(fetcherIndex); } } private boolean isWakenUp() { - return wakeup || runningThread.isInterrupted(); + return wakeup; } @Override diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java index 35deeba..fa1442e 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java @@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.BlockingDeque; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; @@ -50,21 +50,20 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { private final Map<String, SplitT> assignedSplits; /** The current split assignments for this fetcher. */ private final Queue<SplitsChange<SplitT>> splitChanges; - private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue; + private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue; private final SplitReader<E, SplitT> splitReader; private final Runnable shutdownHook; private final AtomicBoolean wakeUp; private final AtomicBoolean closed; private FetchTask<E, SplitT> fetchTask; - private volatile Thread runningThread; private volatile SplitFetcherTask runningTask = null; private volatile boolean isIdle; SplitFetcher( - int id, - BlockingQueue<RecordsWithSplitIds<E>> elementsQueue, - SplitReader<E, SplitT> splitReader, - Runnable shutdownHook) { + int id, + FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, + SplitReader<E, SplitT> splitReader, + Runnable shutdownHook) { this.id = id; this.taskQueue = new LinkedBlockingDeque<>(); @@ -83,14 +82,13 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { LOG.info("Starting split fetcher {}", id); try { // Remove the split from the assignments if it is already done. - runningThread = Thread.currentThread(); this.fetchTask = new FetchTask<>( - splitReader, - elementsQueue, - ids -> { - ids.forEach(this::removeAssignedSplit); - updateIsIdle(); - }, runningThread); + splitReader, + elementsQueue, + ids -> { + ids.forEach(assignedSplits::remove); + updateIsIdle(); + }, id); while (!closed.get()) { runOnce(); } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java index 61bada1..822a9a9 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java @@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -66,7 +65,7 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { private final AtomicReference<Throwable> uncaughtFetcherException; /** The element queue that the split fetchers will put elements into. */ - private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue; + private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue; /** A map keeping track of all the split fetchers. */ protected final Map<Integer, SplitFetcher<E, SplitT>> fetchers; diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java index de51af1..ea0f030 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java @@ -18,83 +18,257 @@ package org.apache.flink.connector.base.source.reader.synchronization; -import java.util.Collection; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Queue; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** - * A BlockingQueue that allows a consuming thread to be notified asynchronously on element - * availability when the queue is empty. - * - * <p>Implementation wise, it is a subclass of {@link LinkedBlockingQueue} that ensures all - * the methods adding elements into the queue will complete the elements availability future. - * - * <p>The overriding methods must first put the elements into the queue then check and complete - * the future if needed. This is required to ensure the thread waiting for more messages will - * not lose a notification. + * A custom implementation of blocking queue with the following features. + * <ul> + * <li> + * It allows a consuming thread to be notified asynchronously on element availability when the + * queue is empty. + * </li> + * <li> + * Allows the putting threads to be gracefully waken up without interruption. + * </li> + * </ul> * * @param <T> the type of the elements in the queue. */ -public class FutureCompletingBlockingQueue<T> extends LinkedBlockingQueue<T> { - +public class FutureCompletingBlockingQueue<T> { + private final int capacity; private final FutureNotifier futureNotifier; + + /** The element queue. */ + private final Queue<T> queue; + /** The lock for synchronization. */ + private final Lock lock; + /** The per-thread conditions that are waiting on putting elements. */ + private final Queue<Condition> notFull; + /** The shared conditions for getting elements. */ + private final Condition notEmpty; + /** The per-thread conditions and wakeUp flags. */ + private ConditionAndFlag[] putConditionAndFlags; + /** - * The default capacity for {@link LinkedBlockingQueue}. + * The default capacity for the queue. */ - private static final Integer DEFAULT_CAPACITY = 10000; + private static final Integer DEFAULT_CAPACITY = 1; public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) { this(futureNotifier, DEFAULT_CAPACITY); } public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int capacity) { - super(capacity); + this.capacity = capacity; this.futureNotifier = futureNotifier; + this.queue = new ArrayDeque<>(capacity); + this.lock = new ReentrantLock(); + this.putConditionAndFlags = new ConditionAndFlag[1]; + this.notFull = new ArrayDeque<>(); + this.notEmpty = lock.newCondition(); + } + + /** + * Put an element into the queue. The thread blocks if the queue is full. + * + * @param threadIndex the index of the thread. + * @param element the element to put. + * @return true if the element has been successfully put into the queue, false otherwise. + * @throws InterruptedException when the thread is interrupted. + */ + public boolean put(int threadIndex, T element) throws InterruptedException { + if (element == null) { + throw new NullPointerException(); + } + lock.lockInterruptibly(); + try { + while (queue.size() >= capacity) { + if (getAndResetWakeUpFlag(threadIndex)) { + return false; + } + waitOnPut(threadIndex); + } + enqueue(element); + return true; + } finally { + lock.unlock(); + } } - @Override - public void put(T t) throws InterruptedException { - super.put(t); + /** + * Get and remove the first element from the queue. The call blocks if the queue is empty. + * + * @return the first element in the queue. + * @throws InterruptedException when the thread is interrupted. + */ + public T take() throws InterruptedException{ + lock.lock(); + try { + while (queue.size() == 0) { + notEmpty.await(); + } + return dequeue(); + } finally { + lock.unlock(); + } + } + + /** + * Get and remove the first element from the queue. Null is retuned if the queue is empty. + * + * @return the first element from the queue, or Null if the queue is empty. + */ + public T poll() { + lock.lock(); + try { + if (queue.size() == 0) { + return null; + } + return dequeue(); + } finally { + lock.unlock(); + } + } + + /** + * Get the first element from the queue without removing it. + * + * @return the first element in the queue, or Null if the queue is empty. + */ + public T peek() { + lock.lock(); + try { + return queue.peek(); + } finally { + lock.unlock(); + } + } + + public int size() { + lock.lock(); + try { + return queue.size(); + } finally { + lock.unlock(); + } + } + + public boolean isEmpty() { + lock.lock(); + try { + return queue.isEmpty(); + } finally { + lock.unlock(); + } + } + + public int remainingCapacity() { + lock.lock(); + try { + return capacity - queue.size(); + } finally { + lock.unlock(); + } + } + + public void wakeUpPuttingThread(int threadIndex) { + lock.lock(); + try { + maybeCreateCondition(threadIndex); + ConditionAndFlag caf = putConditionAndFlags[threadIndex]; + if (caf != null) { + caf.setWakeUp(true); + caf.condition().signal(); + } + } finally { + lock.unlock(); + } + } + + // --------------- private helpers ------------------------- + + private void enqueue(T element) { + int sizeBefore = queue.size(); + queue.add(element); futureNotifier.notifyComplete(); + if (sizeBefore == 0) { + notEmpty.signal(); + } + if (sizeBefore < capacity - 1 && !notFull.isEmpty()) { + signalNextPutter(); + } } - @Override - public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException { - if (super.offer(t, timeout, unit)) { - futureNotifier.notifyComplete(); - return true; - } else { - return false; + private T dequeue() { + int sizeBefore = queue.size(); + T element = queue.poll(); + if (sizeBefore == capacity && !notFull.isEmpty()) { + signalNextPutter(); } + if (sizeBefore > 1) { + notEmpty.signal(); + } + return element; } - @Override - public boolean offer(T t) { - if (super.offer(t)) { - futureNotifier.notifyComplete(); - return true; - } else { - return false; + private void waitOnPut(int fetcherIndex) throws InterruptedException { + maybeCreateCondition(fetcherIndex); + Condition cond = putConditionAndFlags[fetcherIndex].condition(); + notFull.add(cond); + cond.await(); + } + + private void signalNextPutter() { + if (!notFull.isEmpty()) { + notFull.poll().signal(); } } - @Override - public boolean add(T t) { - if (super.add(t)) { - futureNotifier.notifyComplete(); - return true; - } else { - return false; + private void maybeCreateCondition(int threadIndex) { + if (putConditionAndFlags.length < threadIndex + 1) { + putConditionAndFlags = Arrays.copyOf(putConditionAndFlags, threadIndex + 1); + } + + if (putConditionAndFlags[threadIndex] == null) { + putConditionAndFlags[threadIndex] = new ConditionAndFlag(lock.newCondition()); } } - @Override - public boolean addAll(Collection<? extends T> c) { - if (super.addAll(c)) { - futureNotifier.notifyComplete(); + private boolean getAndResetWakeUpFlag(int threadIndex) { + maybeCreateCondition(threadIndex); + if (putConditionAndFlags[threadIndex].getWakeUp()) { + putConditionAndFlags[threadIndex].setWakeUp(false); return true; - } else { - return false; + } + return false; + } + + // --------------- private per thread state ------------ + + private static class ConditionAndFlag { + private final Condition cond; + private boolean wakeUp; + + private ConditionAndFlag(Condition cond) { + this.cond = cond; + this.wakeUp = false; + } + + private Condition condition() { + return cond; + } + + private boolean getWakeUp() { + return wakeUp; + } + + private void setWakeUp(boolean value) { + wakeUp = value; } } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java index e9c2ad2..eef8328 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java @@ -21,6 +21,8 @@ package org.apache.flink.connector.base.source.reader.fetcher; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier; import org.junit.Test; @@ -29,8 +31,6 @@ import java.util.Collections; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -45,9 +45,11 @@ public class SplitFetcherTest { private static final int NUM_RECORDS_PER_SPLIT = 10_000; private static final int INTERRUPT_RECORDS_INTERVAL = 10; private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT * NUM_SPLITS; + @Test public void testWakeup() throws InterruptedException { - BlockingQueue<RecordsWithSplitIds<int[]>> elementQueue = new ArrayBlockingQueue<>(1); + FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue = + new FutureCompletingBlockingQueue<>(new FutureNotifier(), 1); SplitFetcher<int[], MockSourceSplit> fetcher = new SplitFetcher<>( 0, diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java index ede92eb..0d202f7 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java @@ -46,11 +46,10 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR if (!fetches.isEmpty()) { return fetches.removeFirst(); } else { - // block until interrupted + // block until woken up synchronized (fetches) { - while (true) { - fetches.wait(); - } + fetches.wait(); + return null; } } } @@ -61,5 +60,9 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR } @Override - public void wakeUp() {} + public void wakeUp() { + synchronized (fetches) { + fetches.notifyAll(); + } + } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java index ad74f2a..c1bde50 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java @@ -20,16 +20,129 @@ package org.apache.flink.connector.base.source.reader.synchronization; import org.junit.Test; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * The unit test for {@link FutureCompletingBlockingQueue}. */ public class FutureCompletingBlockingQueueTest { + private static final Integer DEFAULT_CAPACITY = 1; + private static final Integer SPECIFIED_CAPACITY = 20000; + + @Test + public void testBasics() throws InterruptedException { + FutureNotifier futureNotifier = new FutureNotifier(); + FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5); + CompletableFuture<Void> future = futureNotifier.future(); + assertTrue(queue.isEmpty()); + assertEquals(0, queue.size()); - private static final Integer DEFAULT_CAPACITY = 10000; - private static final Integer SPECIFIED_CAPACITY = 20000; + queue.put(0, 1234); + + assertTrue(future.isDone()); + assertEquals(1, queue.size()); + assertFalse(queue.isEmpty()); + assertEquals(4, queue.remainingCapacity()); + assertNotNull(queue.peek()); + assertEquals(1234, (int) queue.peek()); + assertEquals(1234, (int) queue.poll()); + + assertEquals(0, queue.size()); + assertTrue(queue.isEmpty()); + assertEquals(5, queue.remainingCapacity()); + } + + @Test + public void testPoll() throws InterruptedException { + FutureNotifier futureNotifier = new FutureNotifier(); + FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier); + queue.put(0, 1234); + Integer value = queue.poll(); + assertNotNull(value); + assertEquals(1234, (int) value); + } + + @Test + public void testWakeUpPut() throws InterruptedException { + FutureNotifier futureNotifier = new FutureNotifier(); + FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 1); + + CountDownLatch latch = new CountDownLatch(1); + new Thread(() -> { + try { + assertTrue(queue.put(0, 1234)); + assertFalse(queue.put(0, 1234)); + latch.countDown(); + } catch (InterruptedException e) { + fail("Interrupted unexpectedly."); + } + }).start(); + + queue.wakeUpPuttingThread(0); + latch.await(); + assertEquals(0, latch.getCount()); + } + + @Test + public void testConcurrency() throws InterruptedException { + FutureNotifier futureNotifier = new FutureNotifier(); + FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5); + final int numValuesPerThread = 10000; + final int numPuttingThreads = 5; + List<Thread> threads = new ArrayList<>(); + + for (int i = 0; i < numPuttingThreads; i++) { + final int index = i; + Thread t = new Thread(() -> { + for (int j = 0; j < numValuesPerThread; j++) { + int base = index * numValuesPerThread; + try { + queue.put(index, base + j); + } catch (InterruptedException e) { + fail("putting thread interrupted."); + } + } + }); + t.start(); + threads.add(t); + } + + BitSet bitSet = new BitSet(); + AtomicInteger count = new AtomicInteger(0); + for (int i = 0; i < 5; i++) { + Thread t = new Thread(() -> { + while (count.get() < numPuttingThreads * numValuesPerThread) { + Integer value = queue.poll(); + if (value == null) { + continue; + } + count.incrementAndGet(); + if (bitSet.get(value)) { + fail("Value " + value + " has been consumed before"); + } + synchronized (bitSet) { + bitSet.set(value); + } + }}); + t.start(); + threads.add(t); + } + for (Thread t : threads) { + t.join(); + } + } @Test public void testFutureCompletingBlockingQueueConstructor() {
