This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 847dc9b7d9b6c53ee25e72f48d6cd0d7cc81dadc Author: sxnan <[email protected]> AuthorDate: Tue Nov 10 15:17:52 2020 +0800 [FLINK-19253][connector/common] Synchronize setting the isIdle flag in SourceReaderBase. --- .../base/source/reader/fetcher/SplitFetcher.java | 25 ++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java index 6b523dc..6d6453f 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java @@ -26,6 +26,8 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.GuardedBy; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -54,9 +56,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { private final FetchTask<E, SplitT> fetchTask; private volatile SplitFetcherTask runningTask = null; + private final Object lock = new Object(); + /** Flag whether this fetcher has no work assigned at the moment. * Fetcher that have work (a split) assigned but are currently blocked (for example enqueueing * a fetch and hitting the element queue limit) are NOT considered idle. */ + @GuardedBy("lock") private volatile boolean isIdle; SplitFetcher( @@ -157,14 +162,15 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { * @param splitsToAdd the splits to add. */ public void addSplits(List<SplitT> splitsToAdd) { - isIdle = false; // in case we were idle before enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits)); wakeUp(true); } public void enqueueTask(SplitFetcherTask task) { - isIdle = false; - taskQueue.offer(task); + synchronized (lock) { + taskQueue.offer(task); + isIdle = false; + } } public SplitReader<E, SplitT> getSplitReader() { @@ -286,9 +292,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { } private void checkAndSetIdle() { - final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty(); - if (nowIdle) { - isIdle = true; + if (shouldIdle()) { + synchronized (lock) { + if (shouldIdle()) { + isIdle = true; + } + } // because the method might get invoked past the point when the source reader last checked // the elements queue, we need to notify availability in the case when we become idle @@ -296,6 +305,10 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { } } + private boolean shouldIdle() { + return assignedSplits.isEmpty() && taskQueue.isEmpty(); + } + //--------------------- Helper class ------------------ private static class DummySplitFetcherTask implements SplitFetcherTask {
