This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd8f6476d55f46a9cb2493bf68a28f3247905dd1
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Sep 14 20:51:56 2020 +0200

    [refactor][core] Eagerly initialize the FetchTask to support proper unit 
testing
    
    Previously, the FetchTask was constructed lazily in the run() method, which 
gets in the
    way of unit testing via the runOnce() method.
---
 .../base/source/reader/fetcher/SplitFetcher.java        | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index fa1442e..289dc34 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -75,20 +75,21 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                this.isIdle = true;
                this.wakeUp = new AtomicBoolean(false);
                this.closed = new AtomicBoolean(false);
+
+               this.fetchTask = new FetchTask<>(
+                               splitReader,
+                               elementsQueue,
+                               ids -> {
+                                       ids.forEach(assignedSplits::remove);
+                                       updateIsIdle();
+                               },
+                               id);
        }
 
        @Override
        public void run() {
                LOG.info("Starting split fetcher {}", id);
                try {
-                       // Remove the split from the assignments if it is 
already done.
-                       this.fetchTask = new FetchTask<>(
-                                       splitReader,
-                                       elementsQueue,
-                                       ids -> {
-                                               
ids.forEach(assignedSplits::remove);
-                                               updateIsIdle();
-                                       }, id);
                        while (!closed.get()) {
                                runOnce();
                        }

Reply via email to