This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit bd8f6476d55f46a9cb2493bf68a28f3247905dd1 Author: Stephan Ewen <[email protected]> AuthorDate: Mon Sep 14 20:51:56 2020 +0200 [refactor][core] Eagerly initialize the FetchTask to support proper unit testing Previously, the FetchTask was constructed lazily in the run() method, which gets in the way of unit testing via the runOnce() method. --- .../base/source/reader/fetcher/SplitFetcher.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java index fa1442e..289dc34 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java @@ -75,20 +75,21 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { this.isIdle = true; this.wakeUp = new AtomicBoolean(false); this.closed = new AtomicBoolean(false); + + this.fetchTask = new FetchTask<>( + splitReader, + elementsQueue, + ids -> { + ids.forEach(assignedSplits::remove); + updateIsIdle(); + }, + id); } @Override public void run() { LOG.info("Starting split fetcher {}", id); try { - // Remove the split from the assignments if it is already done. - this.fetchTask = new FetchTask<>( - splitReader, - elementsQueue, - ids -> { - ids.forEach(assignedSplits::remove); - updateIsIdle(); - }, id); while (!closed.get()) { runOnce(); }
