This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 847dc9b7d9b6c53ee25e72f48d6cd0d7cc81dadc
Author: sxnan <[email protected]>
AuthorDate: Tue Nov 10 15:17:52 2020 +0800

    [FLINK-19253][connector/common] Synchronize setting the isIdle flag in 
SourceReaderBase.
---
 .../base/source/reader/fetcher/SplitFetcher.java   | 25 ++++++++++++++++------
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 6b523dc..6d6453f 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,9 +56,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
        private final FetchTask<E, SplitT> fetchTask;
        private volatile SplitFetcherTask runningTask = null;
 
+       private final Object lock = new Object();
+
        /** Flag whether this fetcher has no work assigned at the moment.
         * Fetcher that have work (a split) assigned but are currently blocked 
(for example enqueueing
         * a fetch and hitting the element queue limit) are NOT considered 
idle. */
+       @GuardedBy("lock")
        private volatile boolean isIdle;
 
        SplitFetcher(
@@ -157,14 +162,15 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
         * @param splitsToAdd the splits to add.
         */
        public void addSplits(List<SplitT> splitsToAdd) {
-               isIdle = false; // in case we were idle before
                enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, 
assignedSplits));
                wakeUp(true);
        }
 
        public void enqueueTask(SplitFetcherTask task) {
-               isIdle = false;
-               taskQueue.offer(task);
+               synchronized (lock) {
+                       taskQueue.offer(task);
+                       isIdle = false;
+               }
        }
 
        public SplitReader<E, SplitT> getSplitReader() {
@@ -286,9 +292,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
        }
 
        private void checkAndSetIdle() {
-               final boolean nowIdle = assignedSplits.isEmpty() && 
taskQueue.isEmpty();
-               if (nowIdle) {
-                       isIdle = true;
+               if (shouldIdle()) {
+                       synchronized (lock) {
+                               if (shouldIdle()) {
+                                       isIdle = true;
+                               }
+                       }
 
                        // because the method might get invoked past the point 
when the source reader last checked
                        // the elements queue, we need to notify availability 
in the case when we become idle
@@ -296,6 +305,10 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
                }
        }
 
+       private boolean shouldIdle() {
+               return assignedSplits.isEmpty() && taskQueue.isEmpty();
+       }
+
        //--------------------- Helper class ------------------
 
        private static class DummySplitFetcherTask implements SplitFetcherTask {

Reply via email to