This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e72e48533902fe6a7271310736584e77b64d05b8
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Sep 14 20:55:57 2020 +0200

    [FLINK-18128][connectors] Ensure idle split fetchers lead to availability 
notifications.
---
 .../base/source/reader/fetcher/SplitFetcher.java   |  23 ++-
 .../FutureCompletingBlockingQueue.java             |   4 +
 .../source/reader/fetcher/SplitFetcherTest.java    | 185 +++++++++++++++++++++
 3 files changed, 206 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 289dc34..3beb0da 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -57,6 +57,10 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
        private final AtomicBoolean closed;
        private FetchTask<E, SplitT> fetchTask;
        private volatile SplitFetcherTask runningTask = null;
+
+       /** Flag whether this fetcher has no work assigned at the moment.
+        * Fetcher that have work (a split) assigned but are currently blocked 
(for example enqueueing
+        * a fetch and hitting the element queue limit) are NOT considered 
idle. */
        private volatile boolean isIdle;
 
        SplitFetcher(
@@ -81,7 +85,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                                elementsQueue,
                                ids -> {
                                        ids.forEach(assignedSplits::remove);
-                                       updateIsIdle();
+                                       checkAndSetIdle();
                                },
                                id);
        }
@@ -168,7 +172,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
         */
        public void addSplits(List<SplitT> splitsToAdd) {
                maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, 
splitChanges, assignedSplits));
-               updateIsIdle();
+               isIdle = false; // in case we were idle before
                wakeUp(true);
        }
 
@@ -292,6 +296,17 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
 
        }
 
+       private void checkAndSetIdle() {
+               final boolean nowIdle = assignedSplits.isEmpty() && 
taskQueue.isEmpty() && splitChanges.isEmpty();
+               if (nowIdle) {
+                       isIdle = true;
+
+                       // because the method might get invoked past the point 
when the source reader last checked
+                       // the elements queue, we need to notify availability 
in the case when we become idle
+                       elementsQueue.notifyAvailable();
+               }
+       }
+
        //--------------------- Helper class ------------------
 
        private static class DummySplitFetcherTask implements SplitFetcherTask {
@@ -316,8 +331,4 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                        return name;
                }
        }
-
-       private void updateIsIdle() {
-               isIdle = taskQueue.isEmpty() && splitChanges.isEmpty() && 
assignedSplits.isEmpty();
-       }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index ea0f030..dcbb66e 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -190,6 +190,10 @@ public class FutureCompletingBlockingQueue<T> {
                }
        }
 
+       public void notifyAvailable() {
+               futureNotifier.notifyComplete();
+       }
+
        // --------------- private helpers -------------------------
 
        private void enqueue(T element) {
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 4fa99dd..6e27d95 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -19,10 +19,15 @@
 package org.apache.flink.connector.base.source.reader.fetcher;
 
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
+import org.apache.flink.core.testutils.CheckedThread;
 
 import org.junit.Test;
 
@@ -31,10 +36,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -43,6 +50,113 @@ import static org.junit.Assert.assertTrue;
 public class SplitFetcherTest {
 
        @Test
+       public void testNewFetcherIsIdle() {
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcher(new TestingSplitReader<>());
+               assertTrue(fetcher.isIdle());
+       }
+
+       @Test
+       public void testFetcherNotIdleAfterSplitAdded() {
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcher(new TestingSplitReader<>());
+               final TestingSourceSplit split = new 
TestingSourceSplit("test-split");
+
+               fetcher.addSplits(Collections.singletonList(split));
+
+               assertFalse(fetcher.isIdle());
+
+               // need to loop here because the internal wakeup flag handling 
means we need multiple loops
+               while (fetcher.assignedSplits().isEmpty()) {
+                       fetcher.runOnce();
+                       assertFalse(fetcher.isIdle());
+               }
+       }
+
+       @Test
+       public void testIdleAfterFinishedSplitsEnqueued() {
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
+                       "test-split", new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+               fetcher.runOnce();
+
+               assertTrue(fetcher.assignedSplits().isEmpty());
+               assertTrue(fetcher.isIdle());
+       }
+
+       @Test
+       public void testNotifiesWhenGoingIdle() {
+               final FutureNotifier notifier = new FutureNotifier();
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
+                       "test-split",
+                       new FutureCompletingBlockingQueue<>(notifier),
+                       new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+               fetcher.runOnce();
+
+               assertTrue(fetcher.assignedSplits().isEmpty());
+               assertTrue(fetcher.isIdle());
+               assertTrue(notifier.future().isDone());
+       }
+
+       @Test
+       public void testNotifiesOlderFutureWhenGoingIdle() {
+               final FutureNotifier notifier = new FutureNotifier();
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
+                       "test-split",
+                       new FutureCompletingBlockingQueue<>(notifier),
+                       new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+               final CompletableFuture<?> future = notifier.future();
+
+               fetcher.runOnce();
+
+               assertTrue(fetcher.assignedSplits().isEmpty());
+               assertTrue(fetcher.isIdle());
+               assertTrue(future.isDone());
+       }
+
+       @Test
+       public void testNotifiesWhenGoingIdleConcurrent() throws Exception {
+               final FutureNotifier notifier = new FutureNotifier();
+               final 
FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
+                               new FutureCompletingBlockingQueue<>(notifier);
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
+                       "test-split", queue, new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+               final QueueDrainerThread queueDrainer = new 
QueueDrainerThread(queue);
+               queueDrainer.start();
+
+               try {
+                       fetcher.runOnce();
+
+                       assertTrue(notifier.future().isDone());
+               } finally {
+                       queueDrainer.shutdown();
+               }
+       }
+
+       @Test
+       public void testNotifiesOlderFutureWhenGoingIdleConcurrent() throws 
Exception {
+               final FutureNotifier notifier = new FutureNotifier();
+               final 
FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
+                       new FutureCompletingBlockingQueue<>(notifier);
+               final SplitFetcher<Object, TestingSourceSplit> fetcher = 
createFetcherWithSplit(
+                       "test-split", queue, new 
TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+               final QueueDrainerThread queueDrainer = new 
QueueDrainerThread(queue);
+               queueDrainer.start();
+
+               final CompletableFuture<?> future = notifier.future();
+
+               try {
+                       fetcher.runOnce();
+
+                       assertTrue(future.isDone());
+               } finally {
+                       queueDrainer.shutdown();
+               }
+       }
+
+       @Test
        public void testWakeup() throws InterruptedException {
                final int numSplits = 3;
                final int numRecordsPerSplit = 10_000;
@@ -118,4 +232,75 @@ public class SplitFetcherTest {
                        interrupter.join();
                }
        }
+
+       // 
------------------------------------------------------------------------
+       //  testing utils
+       // 
------------------------------------------------------------------------
+
+       private static <E> RecordsBySplits<E> finishedSplitFetch(String 
splitId) {
+               return new RecordsBySplits<>(Collections.emptyMap(), 
Collections.singleton(splitId));
+       }
+
+       private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
+                       final SplitReader<E, TestingSourceSplit> reader) {
+               return createFetcher(reader, new 
FutureCompletingBlockingQueue<>(new FutureNotifier()));
+       }
+
+       private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
+                       final SplitReader<E, TestingSourceSplit> reader,
+                       final 
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue) {
+               return new SplitFetcher<>(0, queue, reader, () -> {});
+       }
+
+       private static <E> SplitFetcher<E, TestingSourceSplit> 
createFetcherWithSplit(
+                       final String splitId,
+                       final SplitReader<E, TestingSourceSplit> reader) {
+               return createFetcherWithSplit(splitId, new 
FutureCompletingBlockingQueue<>(new FutureNotifier()), reader);
+       }
+
+       private static <E> SplitFetcher<E, TestingSourceSplit> 
createFetcherWithSplit(
+                       final String splitId,
+                       final 
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue,
+                       final SplitReader<E, TestingSourceSplit> reader) {
+
+               final SplitFetcher<E, TestingSourceSplit> fetcher = 
createFetcher(reader, queue);
+
+               fetcher.addSplits(Collections.singletonList(new 
TestingSourceSplit(splitId)));
+               while (fetcher.assignedSplits().isEmpty()) {
+                       fetcher.runOnce();
+               }
+               return fetcher;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static final class QueueDrainerThread extends CheckedThread {
+
+               private final FutureCompletingBlockingQueue<?> queue;
+               private volatile boolean running = true;
+
+               QueueDrainerThread(FutureCompletingBlockingQueue<?> queue) {
+                       super("Queue Drainer");
+                       setPriority(Thread.MAX_PRIORITY);
+                       this.queue = queue;
+               }
+
+               @Override
+               public void go() throws Exception {
+                       while (running) {
+                               try {
+                                       queue.take();
+                               }
+                               catch (InterruptedException ignored) {
+                                       // fall through the loop
+                               }
+                       }
+               }
+
+               public void shutdown() throws Exception {
+                       running = false;
+                       interrupt();
+                       sync();
+               }
+       }
 }

Reply via email to