This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 84eb280ddeb9af9a4e0726da68094bf115336166 Author: Jiangjie (Becket) Qin <jiangjie...@alibaba-inc.com> AuthorDate: Sat Nov 7 00:28:10 2020 +0800 [hotfix][connector/common] Allow adding external tasks to the SplitFetcher. It helps avoid synchronizations between fetcher thread and main thread. --- .../reader/SingleThreadMultiplexSourceReaderBase.java | 19 +++++++++++++++++++ .../base/source/reader/fetcher/FetchTask.java | 6 ++++-- .../base/source/reader/fetcher/SplitFetcher.java | 14 ++++++++++++-- .../source/reader/fetcher/SplitFetcherManager.java | 1 + 4 files changed, 36 insertions(+), 4 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java index f5806d1..a23da62 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java @@ -94,4 +94,23 @@ public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends config, context); } + + /** + * This constructor behaves like + * {@link #SingleThreadMultiplexSourceReaderBase(Supplier, RecordEmitter, Configuration, SourceReaderContext)}, + * but accepts a specific {@link FutureCompletingBlockingQueue} and {@link SingleThreadFetcherManager}. + */ + public SingleThreadMultiplexSourceReaderBase( + FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, + SingleThreadFetcherManager<E, SplitT> splitFetcherManager, + RecordEmitter<E, T, SplitStateT> recordEmitter, + Configuration config, + SourceReaderContext context) { + super( + elementsQueue, + splitFetcherManager, + recordEmitter, + config, + context); + } } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java index 743e763..8bafdda 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java @@ -62,8 +62,10 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask { // The order matters here. We must first put the last records into the queue. // This ensures the handling of the fetched records is atomic to wakeup. if (elementsQueue.put(fetcherIndex, lastRecords)) { - // The callback does not throw InterruptedException. - splitFinishedCallback.accept(lastRecords.finishedSplits()); + if (!lastRecords.finishedSplits().isEmpty()) { + // The callback does not throw InterruptedException. + splitFinishedCallback.accept(lastRecords.finishedSplits()); + } lastRecords = null; } } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java index 96663ab..86cff2d 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java @@ -79,7 +79,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { elementsQueue, ids -> { ids.forEach(assignedSplits::remove); - checkAndSetIdle(); + LOG.info("Finished reading from splits {}", ids); }, id); } @@ -124,6 +124,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { LOG.debug("Finished running task {}", runningTask); // the task has finished running. Set it to null so it won't be enqueued. runningTask = null; + checkAndSetIdle(); } } catch (Exception e) { throw new RuntimeException(String.format( @@ -148,11 +149,20 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { * @param splitsToAdd the splits to add. */ public void addSplits(List<SplitT> splitsToAdd) { - maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits)); isIdle = false; // in case we were idle before + enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits)); wakeUp(true); } + public void enqueueTask(SplitFetcherTask task) { + isIdle = false; + taskQueue.offer(task); + } + + public SplitReader<E, SplitT> getSplitReader() { + return splitReader; + } + /** * Shutdown the split fetcher. */ diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java index 7a20a59..efd60f4 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java @@ -149,6 +149,7 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { Map.Entry<Integer, SplitFetcher<E, SplitT>> entry = iter.next(); SplitFetcher<E, SplitT> fetcher = entry.getValue(); if (fetcher.isIdle()) { + LOG.info("Closing splitFetcher {} because it is idle.", entry.getKey()); fetcher.shutdown(); iter.remove(); }