This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e72e48533902fe6a7271310736584e77b64d05b8 Author: Stephan Ewen <[email protected]> AuthorDate: Mon Sep 14 20:55:57 2020 +0200 [FLINK-18128][connectors] Ensure idle split fetchers lead to availability notifications. --- .../base/source/reader/fetcher/SplitFetcher.java | 23 ++- .../FutureCompletingBlockingQueue.java | 4 + .../source/reader/fetcher/SplitFetcherTest.java | 185 +++++++++++++++++++++ 3 files changed, 206 insertions(+), 6 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java index 289dc34..3beb0da 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java @@ -57,6 +57,10 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { private final AtomicBoolean closed; private FetchTask<E, SplitT> fetchTask; private volatile SplitFetcherTask runningTask = null; + + /** Flag whether this fetcher has no work assigned at the moment. + * Fetcher that have work (a split) assigned but are currently blocked (for example enqueueing + * a fetch and hitting the element queue limit) are NOT considered idle. */ private volatile boolean isIdle; SplitFetcher( @@ -81,7 +85,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { elementsQueue, ids -> { ids.forEach(assignedSplits::remove); - updateIsIdle(); + checkAndSetIdle(); }, id); } @@ -168,7 +172,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { */ public void addSplits(List<SplitT> splitsToAdd) { maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, splitChanges, assignedSplits)); - updateIsIdle(); + isIdle = false; // in case we were idle before wakeUp(true); } @@ -292,6 +296,17 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { } + private void checkAndSetIdle() { + final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty() && splitChanges.isEmpty(); + if (nowIdle) { + isIdle = true; + + // because the method might get invoked past the point when the source reader last checked + // the elements queue, we need to notify availability in the case when we become idle + elementsQueue.notifyAvailable(); + } + } + //--------------------- Helper class ------------------ private static class DummySplitFetcherTask implements SplitFetcherTask { @@ -316,8 +331,4 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { return name; } } - - private void updateIsIdle() { - isIdle = taskQueue.isEmpty() && splitChanges.isEmpty() && assignedSplits.isEmpty(); - } } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java index ea0f030..dcbb66e 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java @@ -190,6 +190,10 @@ public class FutureCompletingBlockingQueue<T> { } } + public void notifyAvailable() { + futureNotifier.notifyComplete(); + } + // --------------- private helpers ------------------------- private void enqueue(T element) { diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java index 4fa99dd..6e27d95 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java @@ -19,10 +19,15 @@ package org.apache.flink.connector.base.source.reader.fetcher; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader; +import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit; +import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier; +import org.apache.flink.core.testutils.CheckedThread; import org.junit.Test; @@ -31,10 +36,12 @@ import java.util.Collections; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; /** @@ -43,6 +50,113 @@ import static org.junit.Assert.assertTrue; public class SplitFetcherTest { @Test + public void testNewFetcherIsIdle() { + final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcher(new TestingSplitReader<>()); + assertTrue(fetcher.isIdle()); + } + + @Test + public void testFetcherNotIdleAfterSplitAdded() { + final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcher(new TestingSplitReader<>()); + final TestingSourceSplit split = new TestingSourceSplit("test-split"); + + fetcher.addSplits(Collections.singletonList(split)); + + assertFalse(fetcher.isIdle()); + + // need to loop here because the internal wakeup flag handling means we need multiple loops + while (fetcher.assignedSplits().isEmpty()) { + fetcher.runOnce(); + assertFalse(fetcher.isIdle()); + } + } + + @Test + public void testIdleAfterFinishedSplitsEnqueued() { + final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit( + "test-split", new TestingSplitReader<>(finishedSplitFetch("test-split"))); + + fetcher.runOnce(); + + assertTrue(fetcher.assignedSplits().isEmpty()); + assertTrue(fetcher.isIdle()); + } + + @Test + public void testNotifiesWhenGoingIdle() { + final FutureNotifier notifier = new FutureNotifier(); + final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit( + "test-split", + new FutureCompletingBlockingQueue<>(notifier), + new TestingSplitReader<>(finishedSplitFetch("test-split"))); + + fetcher.runOnce(); + + assertTrue(fetcher.assignedSplits().isEmpty()); + assertTrue(fetcher.isIdle()); + assertTrue(notifier.future().isDone()); + } + + @Test + public void testNotifiesOlderFutureWhenGoingIdle() { + final FutureNotifier notifier = new FutureNotifier(); + final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit( + "test-split", + new FutureCompletingBlockingQueue<>(notifier), + new TestingSplitReader<>(finishedSplitFetch("test-split"))); + + final CompletableFuture<?> future = notifier.future(); + + fetcher.runOnce(); + + assertTrue(fetcher.assignedSplits().isEmpty()); + assertTrue(fetcher.isIdle()); + assertTrue(future.isDone()); + } + + @Test + public void testNotifiesWhenGoingIdleConcurrent() throws Exception { + final FutureNotifier notifier = new FutureNotifier(); + final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = + new FutureCompletingBlockingQueue<>(notifier); + final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit( + "test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split"))); + + final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue); + queueDrainer.start(); + + try { + fetcher.runOnce(); + + assertTrue(notifier.future().isDone()); + } finally { + queueDrainer.shutdown(); + } + } + + @Test + public void testNotifiesOlderFutureWhenGoingIdleConcurrent() throws Exception { + final FutureNotifier notifier = new FutureNotifier(); + final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = + new FutureCompletingBlockingQueue<>(notifier); + final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit( + "test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split"))); + + final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue); + queueDrainer.start(); + + final CompletableFuture<?> future = notifier.future(); + + try { + fetcher.runOnce(); + + assertTrue(future.isDone()); + } finally { + queueDrainer.shutdown(); + } + } + + @Test public void testWakeup() throws InterruptedException { final int numSplits = 3; final int numRecordsPerSplit = 10_000; @@ -118,4 +232,75 @@ public class SplitFetcherTest { interrupter.join(); } } + + // ------------------------------------------------------------------------ + // testing utils + // ------------------------------------------------------------------------ + + private static <E> RecordsBySplits<E> finishedSplitFetch(String splitId) { + return new RecordsBySplits<>(Collections.emptyMap(), Collections.singleton(splitId)); + } + + private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher( + final SplitReader<E, TestingSourceSplit> reader) { + return createFetcher(reader, new FutureCompletingBlockingQueue<>(new FutureNotifier())); + } + + private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher( + final SplitReader<E, TestingSourceSplit> reader, + final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue) { + return new SplitFetcher<>(0, queue, reader, () -> {}); + } + + private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit( + final String splitId, + final SplitReader<E, TestingSourceSplit> reader) { + return createFetcherWithSplit(splitId, new FutureCompletingBlockingQueue<>(new FutureNotifier()), reader); + } + + private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit( + final String splitId, + final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue, + final SplitReader<E, TestingSourceSplit> reader) { + + final SplitFetcher<E, TestingSourceSplit> fetcher = createFetcher(reader, queue); + + fetcher.addSplits(Collections.singletonList(new TestingSourceSplit(splitId))); + while (fetcher.assignedSplits().isEmpty()) { + fetcher.runOnce(); + } + return fetcher; + } + + // ------------------------------------------------------------------------ + + private static final class QueueDrainerThread extends CheckedThread { + + private final FutureCompletingBlockingQueue<?> queue; + private volatile boolean running = true; + + QueueDrainerThread(FutureCompletingBlockingQueue<?> queue) { + super("Queue Drainer"); + setPriority(Thread.MAX_PRIORITY); + this.queue = queue; + } + + @Override + public void go() throws Exception { + while (running) { + try { + queue.take(); + } + catch (InterruptedException ignored) { + // fall through the loop + } + } + } + + public void shutdown() throws Exception { + running = false; + interrupt(); + sync(); + } + } }
