This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 84eb280ddeb9af9a4e0726da68094bf115336166
Author: Jiangjie (Becket) Qin <jiangjie...@alibaba-inc.com>
AuthorDate: Sat Nov 7 00:28:10 2020 +0800

    [hotfix][connector/common] Allow adding external tasks to the SplitFetcher. 
It helps avoid synchronizations between fetcher thread and main thread.
---
 .../reader/SingleThreadMultiplexSourceReaderBase.java | 19 +++++++++++++++++++
 .../base/source/reader/fetcher/FetchTask.java         |  6 ++++--
 .../base/source/reader/fetcher/SplitFetcher.java      | 14 ++++++++++++--
 .../source/reader/fetcher/SplitFetcherManager.java    |  1 +
 4 files changed, 36 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index f5806d1..a23da62 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -94,4 +94,23 @@ public abstract class 
SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends
                        config,
                        context);
        }
+
+       /**
+        * This constructor behaves like
+        * {@link #SingleThreadMultiplexSourceReaderBase(Supplier, 
RecordEmitter, Configuration, SourceReaderContext)},
+        * but accepts a specific {@link FutureCompletingBlockingQueue} and 
{@link SingleThreadFetcherManager}.
+        */
+       public SingleThreadMultiplexSourceReaderBase(
+                       FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+                       SingleThreadFetcherManager<E, SplitT> 
splitFetcherManager,
+                       RecordEmitter<E, T, SplitStateT> recordEmitter,
+                       Configuration config,
+                       SourceReaderContext context) {
+               super(
+                       elementsQueue,
+                       splitFetcherManager,
+                       recordEmitter,
+                       config,
+                       context);
+       }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index 743e763..8bafdda 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -62,8 +62,10 @@ class FetchTask<E, SplitT extends SourceSplit> implements 
SplitFetcherTask {
                                // The order matters here. We must first put 
the last records into the queue.
                                // This ensures the handling of the fetched 
records is atomic to wakeup.
                                if (elementsQueue.put(fetcherIndex, 
lastRecords)) {
-                                       // The callback does not throw 
InterruptedException.
-                                       
splitFinishedCallback.accept(lastRecords.finishedSplits());
+                                       if 
(!lastRecords.finishedSplits().isEmpty()) {
+                                               // The callback does not throw 
InterruptedException.
+                                               
splitFinishedCallback.accept(lastRecords.finishedSplits());
+                                       }
                                        lastRecords = null;
                                }
                        }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 96663ab..86cff2d 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -79,7 +79,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                                elementsQueue,
                                ids -> {
                                        ids.forEach(assignedSplits::remove);
-                                       checkAndSetIdle();
+                                       LOG.info("Finished reading from splits 
{}", ids);
                                },
                                id);
        }
@@ -124,6 +124,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                                LOG.debug("Finished running task {}", 
runningTask);
                                // the task has finished running. Set it to 
null so it won't be enqueued.
                                runningTask = null;
+                               checkAndSetIdle();
                        }
                } catch (Exception e) {
                        throw new RuntimeException(String.format(
@@ -148,11 +149,20 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
         * @param splitsToAdd the splits to add.
         */
        public void addSplits(List<SplitT> splitsToAdd) {
-               maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, 
assignedSplits));
                isIdle = false; // in case we were idle before
+               enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, 
assignedSplits));
                wakeUp(true);
        }
 
+       public void enqueueTask(SplitFetcherTask task) {
+               isIdle = false;
+               taskQueue.offer(task);
+       }
+
+       public SplitReader<E, SplitT> getSplitReader() {
+               return splitReader;
+       }
+
        /**
         * Shutdown the split fetcher.
         */
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 7a20a59..efd60f4 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -149,6 +149,7 @@ public abstract class SplitFetcherManager<E, SplitT extends 
SourceSplit> {
                        Map.Entry<Integer, SplitFetcher<E, SplitT>> entry = 
iter.next();
                        SplitFetcher<E, SplitT> fetcher = entry.getValue();
                        if (fetcher.isIdle()) {
+                               LOG.info("Closing splitFetcher {} because it is 
idle.", entry.getKey());
                                fetcher.shutdown();
                                iter.remove();
                        }

Reply via email to