This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 257a0da75feb0cf791de01b0ec43467cc433b955
Author: Stephan Ewen <[email protected]>
AuthorDate: Tue Sep 15 23:36:44 2020 +0200

    [FLINK-19251][connectors] Avoid confusing queue handling in 
"SplitReader.handleSplitsChanges()"
    
    This removes the queue (and repeated queue passing logic) and simly passes 
a list of split changes
    directly and once, for the fetcher to handle.
    
    This closes #13400
---
 .../base/source/reader/fetcher/AddSplitsTask.java     | 18 +++++-------------
 .../base/source/reader/fetcher/SplitFetcher.java      | 10 ++--------
 .../base/source/reader/splitreader/SplitReader.java   |  5 ++---
 .../base/source/reader/SourceReaderBaseTest.java      |  8 ++------
 .../reader/fetcher/SplitFetcherManagerTest.java       |  4 +---
 .../base/source/reader/fetcher/SplitFetcherTest.java  |  2 +-
 .../base/source/reader/mocks/MockBaseSource.java      |  2 +-
 .../base/source/reader/mocks/MockSplitReader.java     | 19 +++++++------------
 .../base/source/reader/mocks/TestingSplitReader.java  |  5 +----
 9 files changed, 22 insertions(+), 51 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
index 82e529a..12b7d5d 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
@@ -21,43 +21,35 @@ package 
org.apache.flink.connector.base.source.reader.fetcher;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
-import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 
 /**
  * The task to add splits.
  */
 class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask {
+
        private final SplitReader<?, SplitT> splitReader;
        private final List<SplitT> splitsToAdd;
-       private final Queue<SplitsChange<SplitT>> splitsChanges;
        private final Map<String, SplitT> assignedSplits;
-       private boolean splitsChangesAdded;
 
        AddSplitsTask(
                        SplitReader<?, SplitT> splitReader,
                        List<SplitT> splitsToAdd,
-                       Queue<SplitsChange<SplitT>> splitsChanges,
                        Map<String, SplitT> assignedSplits) {
                this.splitReader = splitReader;
                this.splitsToAdd = splitsToAdd;
-               this.splitsChanges = splitsChanges;
                this.assignedSplits = assignedSplits;
-               this.splitsChangesAdded = false;
        }
 
        @Override
        public boolean run() {
-               if (!splitsChangesAdded) {
-                       splitsChanges.add(new SplitsAddition<>(splitsToAdd));
-                       splitsToAdd.forEach(s -> 
assignedSplits.put(s.splitId(), s));
-                       splitsChangesAdded = true;
+               for (SplitT s : splitsToAdd) {
+                       assignedSplits.put(s.splitId(), s);
                }
-               splitReader.handleSplitsChanges(splitsChanges);
-               return splitsChanges.isEmpty();
+               splitReader.handleSplitsChanges(new 
SplitsAddition<>(splitsToAdd));
+               return true;
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index db5a203..633c452 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -21,17 +21,14 @@ package 
org.apache.flink.connector.base.source.reader.fetcher;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,8 +44,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
        private final BlockingDeque<SplitFetcherTask> taskQueue;
        // track the assigned splits so we can suspend the reader when there is 
no splits assigned.
        private final Map<String, SplitT> assignedSplits;
-       /** The current split assignments for this fetcher. */
-       private final Queue<SplitsChange<SplitT>> splitChanges;
        private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue;
        private final SplitReader<E, SplitT> splitReader;
        private final Runnable shutdownHook;
@@ -70,7 +65,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
 
                this.id = id;
                this.taskQueue = new LinkedBlockingDeque<>();
-               this.splitChanges = new LinkedList<>();
                this.elementsQueue = elementsQueue;
                this.assignedSplits = new HashMap<>();
                this.splitReader = splitReader;
@@ -148,7 +142,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
         * @param splitsToAdd the splits to add.
         */
        public void addSplits(List<SplitT> splitsToAdd) {
-               maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, 
splitChanges, assignedSplits));
+               maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, 
assignedSplits));
                isIdle = false; // in case we were idle before
                wakeUp(true);
        }
@@ -268,7 +262,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> 
implements Runnable {
        }
 
        private void checkAndSetIdle() {
-               final boolean nowIdle = assignedSplits.isEmpty() && 
taskQueue.isEmpty() && splitChanges.isEmpty();
+               final boolean nowIdle = assignedSplits.isEmpty() && 
taskQueue.isEmpty();
                if (nowIdle) {
                        isIdle = true;
 
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index 9114614..7504452 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 
 import java.io.IOException;
-import java.util.Queue;
 
 /**
  * An interface used to read from splits. The implementation could either read 
from a single split or from
@@ -49,9 +48,9 @@ public interface SplitReader<E, SplitT extends SourceSplit> {
        /**
         * Handle the split changes. This call should be non-blocking.
         *
-        * @param splitsChanges a queue with split changes that has not been 
handled by this SplitReader.
+        * @param splitsChanges the split changes that the SplitReader needs to 
handle.
         */
-       void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);
+       void handleSplitsChanges(SplitsChange<SplitT> splitsChanges);
 
        /**
         * Wake up the split reader in case the fetcher thread is blocking in
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 84eeb4e..ce32521 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -42,7 +42,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Queue;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -73,10 +72,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                                }
 
                                @Override
-                               public void 
handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChanges) {
-                                       // We have to handle split changes 
first, otherwise fetch will not be called.
-                                       splitsChanges.clear();
-                               }
+                               public void 
handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {}
 
                                @Override
                                public void wakeUp() {
@@ -127,7 +123,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
                        new FutureCompletingBlockingQueue<>();
                MockSplitReader mockSplitReader =
-                       new MockSplitReader(2, true, true);
+                       new MockSplitReader(2, true);
                return new MockSourceReader(
                        elementsQueue,
                        () -> mockSplitReader,
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
index 3ff25e0..6d7b8b1 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
@@ -141,9 +141,7 @@ public class SplitFetcherManagerTest {
                }
 
                @Override
-               public void handleSplitsChanges(Queue<SplitsChange<SplitT>> 
splitsChanges) {
-                       splitsChanges.clear();
-               }
+               public void handleSplitsChanges(SplitsChange<SplitT> 
splitsChanges) {}
 
                @Override
                public void wakeUp() {}
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index e5f5faf..5027e3f 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -166,7 +166,7 @@ public class SplitFetcherTest {
                                new SplitFetcher<>(
                                                0,
                                                elementQueue,
-                                               new MockSplitReader(2, true, 
true),
+                                               new MockSplitReader(2, true),
                                                () -> {});
 
                // Prepare the splits.
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
index 2681e5a..5eaa4fb 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
@@ -75,7 +75,7 @@ public class MockBaseSource implements Source<Integer, 
MockSourceSplit, List<Moc
                config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 
30000L);
                return new MockSourceReader(
                                elementsQueue,
-                               () -> new MockSplitReader(2, true, true),
+                               () -> new MockSplitReader(2, true),
                                config,
                                readerContext);
        }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index 4cb738a..f3d15f6 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Queue;
 
 /**
  * A mock split reader for unit tests. The mock split reader provides 
configurable behaviours.
@@ -42,7 +41,6 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
        private final Map<String, MockSourceSplit> splits = new 
LinkedHashMap<>();
        private final int numRecordsPerSplitPerFetch;
        private final boolean blockingFetch;
-       private final boolean handleSplitsInOneShot;
 
        private final Object wakeupLock = new Object();
        private volatile Thread threadInBlocking;
@@ -50,11 +48,9 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
 
        public MockSplitReader(
                        int numRecordsPerSplitPerFetch,
-                       boolean blockingFetch,
-                       boolean handleSplitsInOneShot) {
+                       boolean blockingFetch) {
                this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch;
                this.blockingFetch = blockingFetch;
-               this.handleSplitsInOneShot = handleSplitsInOneShot;
        }
 
        @Override
@@ -63,13 +59,12 @@ public class MockSplitReader implements SplitReader<int[], 
MockSourceSplit> {
        }
 
        @Override
-       public void handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> 
splitsChanges) {
-               do {
-                       SplitsChange<MockSourceSplit> splitsChange = 
splitsChanges.poll();
-                       if (splitsChange instanceof SplitsAddition) {
-                               splitsChange.splits().forEach(s -> 
splits.put(s.splitId(), s));
-                       }
-               } while (handleSplitsInOneShot && !splitsChanges.isEmpty());
+       public void handleSplitsChanges(SplitsChange<MockSourceSplit> 
splitsChange) {
+               if (splitsChange instanceof SplitsAddition) {
+                       splitsChange.splits().forEach(s -> 
splits.put(s.splitId(), s));
+               } else {
+                       throw new IllegalArgumentException("Do not recognize 
split change: " + splitsChange);
+               }
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
index 5643088..2e6c760 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Arrays;
-import java.util.Queue;
 
 /**
  * A {@code SplitReader} that returns a pre-defined set of records (by split).
@@ -59,9 +58,7 @@ public class TestingSplitReader<E, SplitT extends 
SourceSplit> implements SplitR
        }
 
        @Override
-       public void handleSplitsChanges(Queue<SplitsChange<SplitT>> 
splitsChanges) {
-               splitsChanges.clear();
-       }
+       public void handleSplitsChanges(SplitsChange<SplitT> splitsChanges) {}
 
        @Override
        public void wakeUp() {

Reply via email to