This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 257a0da75feb0cf791de01b0ec43467cc433b955 Author: Stephan Ewen <[email protected]> AuthorDate: Tue Sep 15 23:36:44 2020 +0200 [FLINK-19251][connectors] Avoid confusing queue handling in "SplitReader.handleSplitsChanges()" This removes the queue (and repeated queue passing logic) and simly passes a list of split changes directly and once, for the fetcher to handle. This closes #13400 --- .../base/source/reader/fetcher/AddSplitsTask.java | 18 +++++------------- .../base/source/reader/fetcher/SplitFetcher.java | 10 ++-------- .../base/source/reader/splitreader/SplitReader.java | 5 ++--- .../base/source/reader/SourceReaderBaseTest.java | 8 ++------ .../reader/fetcher/SplitFetcherManagerTest.java | 4 +--- .../base/source/reader/fetcher/SplitFetcherTest.java | 2 +- .../base/source/reader/mocks/MockBaseSource.java | 2 +- .../base/source/reader/mocks/MockSplitReader.java | 19 +++++++------------ .../base/source/reader/mocks/TestingSplitReader.java | 5 +---- 9 files changed, 22 insertions(+), 51 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java index 82e529a..12b7d5d 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java @@ -21,43 +21,35 @@ package org.apache.flink.connector.base.source.reader.fetcher; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; -import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import java.util.List; import java.util.Map; -import java.util.Queue; /** * The task to add splits. */ class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask { + private final SplitReader<?, SplitT> splitReader; private final List<SplitT> splitsToAdd; - private final Queue<SplitsChange<SplitT>> splitsChanges; private final Map<String, SplitT> assignedSplits; - private boolean splitsChangesAdded; AddSplitsTask( SplitReader<?, SplitT> splitReader, List<SplitT> splitsToAdd, - Queue<SplitsChange<SplitT>> splitsChanges, Map<String, SplitT> assignedSplits) { this.splitReader = splitReader; this.splitsToAdd = splitsToAdd; - this.splitsChanges = splitsChanges; this.assignedSplits = assignedSplits; - this.splitsChangesAdded = false; } @Override public boolean run() { - if (!splitsChangesAdded) { - splitsChanges.add(new SplitsAddition<>(splitsToAdd)); - splitsToAdd.forEach(s -> assignedSplits.put(s.splitId(), s)); - splitsChangesAdded = true; + for (SplitT s : splitsToAdd) { + assignedSplits.put(s.splitId(), s); } - splitReader.handleSplitsChanges(splitsChanges); - return splitsChanges.isEmpty(); + splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd)); + return true; } @Override diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java index db5a203..633c452 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java @@ -21,17 +21,14 @@ package org.apache.flink.connector.base.source.reader.fetcher; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; -import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; @@ -47,8 +44,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { private final BlockingDeque<SplitFetcherTask> taskQueue; // track the assigned splits so we can suspend the reader when there is no splits assigned. private final Map<String, SplitT> assignedSplits; - /** The current split assignments for this fetcher. */ - private final Queue<SplitsChange<SplitT>> splitChanges; private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue; private final SplitReader<E, SplitT> splitReader; private final Runnable shutdownHook; @@ -70,7 +65,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { this.id = id; this.taskQueue = new LinkedBlockingDeque<>(); - this.splitChanges = new LinkedList<>(); this.elementsQueue = elementsQueue; this.assignedSplits = new HashMap<>(); this.splitReader = splitReader; @@ -148,7 +142,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { * @param splitsToAdd the splits to add. */ public void addSplits(List<SplitT> splitsToAdd) { - maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, splitChanges, assignedSplits)); + maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits)); isIdle = false; // in case we were idle before wakeUp(true); } @@ -268,7 +262,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { } private void checkAndSetIdle() { - final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty() && splitChanges.isEmpty(); + final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty(); if (nowIdle) { isIdle = true; diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java index 9114614..7504452 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java @@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import java.io.IOException; -import java.util.Queue; /** * An interface used to read from splits. The implementation could either read from a single split or from @@ -49,9 +48,9 @@ public interface SplitReader<E, SplitT extends SourceSplit> { /** * Handle the split changes. This call should be non-blocking. * - * @param splitsChanges a queue with split changes that has not been handled by this SplitReader. + * @param splitsChanges the split changes that the SplitReader needs to handle. */ - void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges); + void handleSplitsChanges(SplitsChange<SplitT> splitsChanges); /** * Wake up the split reader in case the fetcher thread is blocking in diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index 84eeb4e..ce32521 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -42,7 +42,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Queue; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -73,10 +72,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> } @Override - public void handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChanges) { - // We have to handle split changes first, otherwise fetch will not be called. - splitsChanges.clear(); - } + public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {} @Override public void wakeUp() { @@ -127,7 +123,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = new FutureCompletingBlockingQueue<>(); MockSplitReader mockSplitReader = - new MockSplitReader(2, true, true); + new MockSplitReader(2, true); return new MockSourceReader( elementsQueue, () -> mockSplitReader, diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java index 3ff25e0..6d7b8b1 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java @@ -141,9 +141,7 @@ public class SplitFetcherManagerTest { } @Override - public void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges) { - splitsChanges.clear(); - } + public void handleSplitsChanges(SplitsChange<SplitT> splitsChanges) {} @Override public void wakeUp() {} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java index e5f5faf..5027e3f 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java @@ -166,7 +166,7 @@ public class SplitFetcherTest { new SplitFetcher<>( 0, elementQueue, - new MockSplitReader(2, true, true), + new MockSplitReader(2, true), () -> {}); // Prepare the splits. diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java index 2681e5a..5eaa4fb 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java @@ -75,7 +75,7 @@ public class MockBaseSource implements Source<Integer, MockSourceSplit, List<Moc config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L); return new MockSourceReader( elementsQueue, - () -> new MockSplitReader(2, true, true), + () -> new MockSplitReader(2, true), config, readerContext); } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java index 4cb738a..f3d15f6 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java @@ -27,7 +27,6 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import java.util.LinkedHashMap; import java.util.Map; -import java.util.Queue; /** * A mock split reader for unit tests. The mock split reader provides configurable behaviours. @@ -42,7 +41,6 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { private final Map<String, MockSourceSplit> splits = new LinkedHashMap<>(); private final int numRecordsPerSplitPerFetch; private final boolean blockingFetch; - private final boolean handleSplitsInOneShot; private final Object wakeupLock = new Object(); private volatile Thread threadInBlocking; @@ -50,11 +48,9 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { public MockSplitReader( int numRecordsPerSplitPerFetch, - boolean blockingFetch, - boolean handleSplitsInOneShot) { + boolean blockingFetch) { this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch; this.blockingFetch = blockingFetch; - this.handleSplitsInOneShot = handleSplitsInOneShot; } @Override @@ -63,13 +59,12 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> { } @Override - public void handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChanges) { - do { - SplitsChange<MockSourceSplit> splitsChange = splitsChanges.poll(); - if (splitsChange instanceof SplitsAddition) { - splitsChange.splits().forEach(s -> splits.put(s.splitId(), s)); - } - } while (handleSplitsInOneShot && !splitsChanges.isEmpty()); + public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChange) { + if (splitsChange instanceof SplitsAddition) { + splitsChange.splits().forEach(s -> splits.put(s.splitId(), s)); + } else { + throw new IllegalArgumentException("Do not recognize split change: " + splitsChange); + } } @Override diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java index 5643088..2e6c760 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java @@ -26,7 +26,6 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; -import java.util.Queue; /** * A {@code SplitReader} that returns a pre-defined set of records (by split). @@ -59,9 +58,7 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR } @Override - public void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges) { - splitsChanges.clear(); - } + public void handleSplitsChanges(SplitsChange<SplitT> splitsChanges) {} @Override public void wakeUp() {
