This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6afee1de658 [FLINK-34196][Connectors][FLIP-389] Annotate
SingleThreadFetcherManager as PublicEvolving
6afee1de658 is described below
commit 6afee1de6585074e0df6205f1f52bb239dcf4a77
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Jan 17 17:02:19 2024 +0800
[FLINK-34196][Connectors][FLIP-389] Annotate SingleThreadFetcherManager as
PublicEvolving
---
.../SingleThreadMultiplexSourceReaderBase.java | 48 +++++++++++++--
.../base/source/reader/SourceReaderBase.java | 44 +++++++++++++
.../reader/fetcher/SingleThreadFetcherManager.java | 51 +++++++++++++--
.../base/source/reader/fetcher/SplitFetcher.java | 4 +-
.../source/reader/fetcher/SplitFetcherManager.java | 72 +++++++++++++++++++++-
.../source/reader/fetcher/SplitFetcherTask.java | 4 +-
.../base/source/hybrid/HybridSourceReaderTest.java | 11 +---
.../base/source/reader/SourceReaderBaseTest.java | 56 +++--------------
.../reader/fetcher/SplitFetcherManagerTest.java | 21 ++++---
.../SplitFetcherPauseResumeSplitReaderTest.java | 18 ++----
.../base/source/reader/mocks/MockBaseSource.java | 6 +-
.../base/source/reader/mocks/MockSourceReader.java | 14 +----
12 files changed, 240 insertions(+), 109 deletions(-)
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index 72cb5e2f8f5..f24fd3879f2 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -74,10 +74,8 @@ public abstract class SingleThreadMultiplexSourceReaderBase<
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
- this(
- new FutureCompletingBlockingQueue<>(
-
config.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),
- splitReaderSupplier,
+ super(
+ new SingleThreadFetcherManager<>(splitReaderSupplier, config),
recordEmitter,
config,
context);
@@ -87,7 +85,11 @@ public abstract class SingleThreadMultiplexSourceReaderBase<
* This constructor behaves like {@link
#SingleThreadMultiplexSourceReaderBase(Supplier,
* RecordEmitter, Configuration, SourceReaderContext)}, but accepts a
specific {@link
* FutureCompletingBlockingQueue}.
+ *
+ * @deprecated Please use {@link
#SingleThreadMultiplexSourceReaderBase(Supplier, RecordEmitter,
+ * Configuration, SourceReaderContext)} instead.
*/
+ @Deprecated
public SingleThreadMultiplexSourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
@@ -106,7 +108,12 @@ public abstract class
SingleThreadMultiplexSourceReaderBase<
* This constructor behaves like {@link
#SingleThreadMultiplexSourceReaderBase(Supplier,
* RecordEmitter, Configuration, SourceReaderContext)}, but accepts a
specific {@link
* FutureCompletingBlockingQueue} and {@link SingleThreadFetcherManager}.
+ *
+ * @deprecated Please use {@link
+ * #SingleThreadMultiplexSourceReaderBase(SingleThreadFetcherManager,
RecordEmitter,
+ * Configuration, SourceReaderContext)} instead.
*/
+ @Deprecated
public SingleThreadMultiplexSourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
@@ -121,7 +128,12 @@ public abstract class
SingleThreadMultiplexSourceReaderBase<
* RecordEmitter, Configuration, SourceReaderContext)}, but accepts a
specific {@link
* FutureCompletingBlockingQueue}, {@link SingleThreadFetcherManager} and
{@link
* RecordEvaluator}.
+ *
+ * @deprecated Please use {@link
+ * #SingleThreadMultiplexSourceReaderBase(SingleThreadFetcherManager,
RecordEmitter,
+ * RecordEvaluator, Configuration, SourceReaderContext)} instead.
*/
+ @Deprecated
public SingleThreadMultiplexSourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
@@ -137,4 +149,32 @@ public abstract class
SingleThreadMultiplexSourceReaderBase<
config,
context);
}
+
+ /**
+ * This constructor behaves like {@link
#SingleThreadMultiplexSourceReaderBase(Supplier,
+ * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a
specific {@link
+ * FutureCompletingBlockingQueue} and {@link SingleThreadFetcherManager}.
+ */
+ public SingleThreadMultiplexSourceReaderBase(
+ SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
+ RecordEmitter<E, T, SplitStateT> recordEmitter,
+ Configuration config,
+ SourceReaderContext context) {
+ super(splitFetcherManager, recordEmitter, config, context);
+ }
+
+ /**
+ * This constructor behaves like {@link
#SingleThreadMultiplexSourceReaderBase(Supplier,
+ * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a
specific {@link
+ * FutureCompletingBlockingQueue}, {@link SingleThreadFetcherManager} and
{@link
+ * RecordEvaluator}.
+ */
+ public SingleThreadMultiplexSourceReaderBase(
+ SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
+ RecordEmitter<E, T, SplitStateT> recordEmitter,
+ @Nullable RecordEvaluator<T> eofRecordEvaluator,
+ Configuration config,
+ SourceReaderContext context) {
+ super(splitFetcherManager, recordEmitter, eofRecordEvaluator, config,
context);
+ }
}
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index fafec33e3af..bf14bdb6215 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -107,6 +107,11 @@ public abstract class SourceReaderBase<E, T, SplitT
extends SourceSplit, SplitSt
@Nullable protected final RecordEvaluator<T> eofRecordEvaluator;
+ /**
+ * @deprecated Please use {@link #SourceReaderBase(SplitFetcherManager,
RecordEmitter,
+ * Configuration, SourceReaderContext)} instead.
+ */
+ @Deprecated
public SourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
SplitFetcherManager<E, SplitT> splitFetcherManager,
@@ -116,6 +121,11 @@ public abstract class SourceReaderBase<E, T, SplitT
extends SourceSplit, SplitSt
this(elementsQueue, splitFetcherManager, recordEmitter, null, config,
context);
}
+ /**
+ * @deprecated Please use {@link #SourceReaderBase(SplitFetcherManager,
RecordEmitter,
+ * RecordEvaluator, Configuration, SourceReaderContext)} instead.
+ */
+ @Deprecated
public SourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
SplitFetcherManager<E, SplitT> splitFetcherManager,
@@ -136,6 +146,39 @@ public abstract class SourceReaderBase<E, T, SplitT
extends SourceSplit, SplitSt
numRecordsInCounter =
context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
}
+ /**
+ * The primary constructor for the source reader.
+ *
+ * <p>The reader will use a handover queue sized as configured via {@link
+ * SourceReaderOptions#ELEMENT_QUEUE_CAPACITY}.
+ */
+ public SourceReaderBase(
+ SplitFetcherManager<E, SplitT> splitFetcherManager,
+ RecordEmitter<E, T, SplitStateT> recordEmitter,
+ Configuration config,
+ SourceReaderContext context) {
+ this(splitFetcherManager, recordEmitter, null, config, context);
+ }
+
+ public SourceReaderBase(
+ SplitFetcherManager<E, SplitT> splitFetcherManager,
+ RecordEmitter<E, T, SplitStateT> recordEmitter,
+ @Nullable RecordEvaluator<T> eofRecordEvaluator,
+ Configuration config,
+ SourceReaderContext context) {
+ this.elementsQueue = splitFetcherManager.getQueue();
+ this.splitFetcherManager = splitFetcherManager;
+ this.recordEmitter = recordEmitter;
+ this.splitStates = new HashMap<>();
+ this.options = new SourceReaderOptions(config);
+ this.config = config;
+ this.context = context;
+ this.noMoreSplitsAssignment = false;
+ this.eofRecordEvaluator = eofRecordEvaluator;
+
+ numRecordsInCounter =
context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
+ }
+
@Override
public void start() {}
@@ -308,6 +351,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends
SourceSplit, SplitSt
}
// -------------------- Abstract method to allow different implementations
------------------
+
/** Handles the finished splits to clean the state if needed. */
protected abstract void onSplitFinished(Map<String, SplitStateT>
finishedSplitIds);
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
index a3ae181da8d..391885a6fbe 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -18,7 +18,7 @@
package org.apache.flink.connector.base.source.reader.fetcher;
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
@@ -41,7 +41,7 @@ import java.util.function.Supplier;
* via the same client. In the example of the file source, there is a single
thread that reads the
* files after another.
*/
-@Internal
+@PublicEvolving
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
extends SplitFetcherManager<E, SplitT> {
@@ -53,14 +53,13 @@ public class SingleThreadFetcherManager<E, SplitT extends
SourceSplit>
* the same queue instance that is also passed to the {@link
SourceReaderBase}.
* @param splitReaderSupplier The factory for the split reader that
connects to the source
* system.
- * @deprecated Please use {@link
#SingleThreadFetcherManager(FutureCompletingBlockingQueue,
- * Supplier, Configuration)} instead.
+ * @deprecated Please use {@link #SingleThreadFetcherManager(Supplier,
Configuration)} instead.
*/
@Deprecated
public SingleThreadFetcherManager(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
- this(elementsQueue, splitReaderSupplier, new Configuration());
+ super(elementsQueue, splitReaderSupplier, new Configuration());
}
/**
@@ -72,7 +71,9 @@ public class SingleThreadFetcherManager<E, SplitT extends
SourceSplit>
* @param splitReaderSupplier The factory for the split reader that
connects to the source
* system.
* @param configuration The configuration to create the fetcher manager.
+ * @deprecated Please use {@link #SingleThreadFetcherManager(Supplier,
Configuration)} instead.
*/
+ @Deprecated
public SingleThreadFetcherManager(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
@@ -90,8 +91,11 @@ public class SingleThreadFetcherManager<E, SplitT extends
SourceSplit>
* system.
* @param configuration The configuration to create the fetcher manager.
* @param splitFinishedHook Hook for handling finished splits in split
fetchers
+ * @deprecated Please use {@link #SingleThreadFetcherManager(Supplier,
Configuration, Consumer)}
+ * instead.
*/
@VisibleForTesting
+ @Deprecated
public SingleThreadFetcherManager(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
@@ -100,6 +104,43 @@ public class SingleThreadFetcherManager<E, SplitT extends
SourceSplit>
super(elementsQueue, splitReaderSupplier, configuration,
splitFinishedHook);
}
+ /**
+ * Creates a new SplitFetcherManager with a single I/O threads.
+ *
+ * @param splitReaderSupplier The factory for the split reader that
connects to the source
+ * system.
+ */
+ public SingleThreadFetcherManager(Supplier<SplitReader<E, SplitT>>
splitReaderSupplier) {
+ super(splitReaderSupplier, new Configuration());
+ }
+
+ /**
+ * Creates a new SplitFetcherManager with a single I/O threads.
+ *
+ * @param splitReaderSupplier The factory for the split reader that
connects to the source
+ * system.
+ * @param configuration The configuration to create the fetcher manager.
+ */
+ public SingleThreadFetcherManager(
+ Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
Configuration configuration) {
+ super(splitReaderSupplier, configuration);
+ }
+
+ /**
+ * Creates a new SplitFetcherManager with a single I/O threads.
+ *
+ * @param splitReaderSupplier The factory for the split reader that
connects to the source
+ * system.
+ * @param configuration The configuration to create the fetcher manager.
+ * @param splitFinishedHook Hook for handling finished splits in split
fetchers
+ */
+ public SingleThreadFetcherManager(
+ Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+ Configuration configuration,
+ Consumer<Collection<String>> splitFinishedHook) {
+ super(splitReaderSupplier, configuration, splitFinishedHook);
+ }
+
@Override
public void addSplits(List<SplitT> splitsToAdd) {
SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 5e6cca3ca9e..339686415ee 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -18,7 +18,7 @@
package org.apache.flink.connector.base.source.reader.fetcher;
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -43,7 +43,7 @@ import java.util.function.Consumer;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** The internal fetcher runnable responsible for polling message from the
external system. */
-@Internal
+@PublicEvolving
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(SplitFetcher.class);
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 594c3d168af..4f7219eb2b8 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -19,11 +19,13 @@
package org.apache.flink.connector.base.source.reader.fetcher;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
@@ -55,7 +57,7 @@ import static
org.apache.flink.configuration.PipelineOptions.ALLOW_UNALIGNED_SOU
* manager would only start a single fetcher and assign all the splits to it.
A one-thread-per-split
* fetcher may spawn a new thread every time a new split is assigned.
*/
-@Internal
+@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
private static final Logger LOG =
LoggerFactory.getLogger(SplitFetcherManager.class);
@@ -99,7 +101,9 @@ public abstract class SplitFetcherManager<E, SplitT extends
SourceSplit> {
* @param elementsQueue the queue that split readers will put elements
into.
* @param splitReaderFactory a supplier that could be used to create split
readers.
* @param configuration the configuration of this fetcher manager.
+ * @deprecated Please use {@link #SplitFetcherManager(Supplier,
Configuration)} instead.
*/
+ @Deprecated
public SplitFetcherManager(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
Supplier<SplitReader<E, SplitT>> splitReaderFactory,
@@ -114,7 +118,10 @@ public abstract class SplitFetcherManager<E, SplitT
extends SourceSplit> {
* @param splitReaderFactory a supplier that could be used to create split
readers.
* @param configuration the configuration of this fetcher manager.
* @param splitFinishedHook Hook for handling finished splits in split
fetchers.
+ * @deprecated Please use {@link #SplitFetcherManager(Supplier,
Configuration, Consumer)}
+ * instead.
*/
+ @Deprecated
@VisibleForTesting
public SplitFetcherManager(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
@@ -151,6 +158,60 @@ public abstract class SplitFetcherManager<E, SplitT
extends SourceSplit> {
this.closed = false;
}
+ /**
+ * Create a split fetcher manager.
+ *
+ * @param splitReaderFactory a supplier that could be used to create split
readers.
+ * @param configuration the configuration of this fetcher manager.
+ */
+ public SplitFetcherManager(
+ Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration
configuration) {
+ this(splitReaderFactory, configuration, (ignore) -> {});
+ }
+
+ /**
+ * Create a split fetcher manager.
+ *
+ * @param splitReaderFactory a supplier that could be used to create split
readers.
+ * @param configuration the configuration of this fetcher manager.
+ * @param splitFinishedHook Hook for handling finished splits in split
fetchers.
+ */
+ public SplitFetcherManager(
+ Supplier<SplitReader<E, SplitT>> splitReaderFactory,
+ Configuration configuration,
+ Consumer<Collection<String>> splitFinishedHook) {
+ this.elementsQueue =
+ new FutureCompletingBlockingQueue<>(
+
configuration.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
+ this.errorHandler =
+ new Consumer<Throwable>() {
+ @Override
+ public void accept(Throwable t) {
+ LOG.error("Received uncaught exception.", t);
+ if (!uncaughtFetcherException.compareAndSet(null, t)) {
+ // Add the exception to the exception list.
+ uncaughtFetcherException.get().addSuppressed(t);
+ }
+ // Wake up the main thread to let it know the
exception.
+ elementsQueue.notifyAvailable();
+ }
+ };
+ this.splitReaderFactory = splitReaderFactory;
+ this.splitFinishedHook = splitFinishedHook;
+ this.uncaughtFetcherException = new AtomicReference<>(null);
+ this.fetcherIdGenerator = new AtomicInteger(0);
+ this.fetchers = new ConcurrentHashMap<>();
+ this.allowUnalignedSourceSplits =
configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS);
+
+ // Create the executor with a thread factory that fails the source
reader if one of
+ // the fetcher thread exits abnormally.
+ final String taskThreadName = Thread.currentThread().getName();
+ this.executors =
+ Executors.newCachedThreadPool(
+ r -> new Thread(r, "Source Data Fetcher for " +
taskThreadName));
+ this.closed = false;
+ }
+
public abstract void addSplits(List<SplitT> splitsToAdd);
public abstract void removeSplits(List<SplitT> splitsToRemove);
@@ -238,6 +299,15 @@ public abstract class SplitFetcherManager<E, SplitT
extends SourceSplit> {
return fetchers.isEmpty();
}
+ /**
+ * Return the queue contains data produced by split fetchers.This method
is Internal and only
+ * used in {@link SourceReaderBase}.
+ */
+ @Internal
+ public FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> getQueue() {
+ return elementsQueue;
+ }
+
/**
* Close the split fetcher manager.
*
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
index 62a21d3c3f1..3a95c5b1a87 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
@@ -18,12 +18,12 @@
package org.apache.flink.connector.base.source.reader.fetcher;
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
import java.io.IOException;
/** An interface similar to {@link Runnable} but allows throwing exceptions
and wakeup. */
-@Internal
+@PublicEvolving
public interface SplitFetcherTask {
/**
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
index 1853eb4439a..b7fef166a65 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
@@ -25,13 +25,11 @@ import
org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
@@ -330,17 +328,13 @@ public class HybridSourceReaderTest {
private CompletableFuture<Void> availabilityFuture = new
CompletableFuture<>();
public MutableFutureSourceReader(
- FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
elementsQueue,
Supplier<SplitReader<int[], MockSourceSplit>>
splitFetcherSupplier,
Configuration config,
SourceReaderContext context) {
- super(elementsQueue, splitFetcherSupplier, config, context);
+ super(splitFetcherSupplier, config, context);
}
public static MutableFutureSourceReader
createReader(SourceReaderContext readerContext) {
- FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
elementsQueue =
- new FutureCompletingBlockingQueue<>();
-
Configuration config = new Configuration();
config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 2);
config.set(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT,
30000L);
@@ -348,8 +342,7 @@ public class HybridSourceReaderTest {
MockSplitReader.newBuilder()
.setNumRecordsPerSplitPerFetch(2)
.setBlockingFetch(true);
- return new MutableFutureSourceReader(
- elementsQueue, builder::build, config, readerContext);
+ return new MutableFutureSourceReader(builder::build, config,
readerContext);
}
@Override
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 9096bac02ac..1cac47c9dec 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -36,7 +36,6 @@ import
org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
-import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
@@ -82,13 +81,10 @@ public class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit>
() -> {
final String errMsg = "Testing Exception";
-
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
- elementsQueue = new
FutureCompletingBlockingQueue<>();
// We have to handle split changes first,
otherwise fetch will not be
// called.
try (MockSourceReader reader =
new MockSourceReader(
- elementsQueue,
() ->
new SplitReader<int[],
MockSourceSplit>() {
@Override
@@ -160,8 +156,6 @@ public class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit>
@Test
void testMultipleSplitsWithDifferentFinishingMoments() throws Exception {
- FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
elementsQueue =
- new FutureCompletingBlockingQueue<>();
MockSplitReader mockSplitReader =
MockSplitReader.newBuilder()
.setNumRecordsPerSplitPerFetch(2)
@@ -170,10 +164,7 @@ public class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit>
.build();
MockSourceReader reader =
new MockSourceReader(
- elementsQueue,
- () -> mockSplitReader,
- getConfig(),
- new TestingReaderContext());
+ () -> mockSplitReader, getConfig(), new
TestingReaderContext());
reader.start();
@@ -196,8 +187,6 @@ public class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit>
@Test
void testMultipleSplitsWithSeparatedFinishedRecord() throws Exception {
- FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
elementsQueue =
- new FutureCompletingBlockingQueue<>();
MockSplitReader mockSplitReader =
MockSplitReader.newBuilder()
.setNumRecordsPerSplitPerFetch(2)
@@ -206,10 +195,7 @@ public class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit>
.build();
MockSourceReader reader =
new MockSourceReader(
- elementsQueue,
- () -> mockSplitReader,
- getConfig(),
- new TestingReaderContext());
+ () -> mockSplitReader, getConfig(), new
TestingReaderContext());
reader.start();
@@ -234,22 +220,15 @@ public class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit>
void
testPollNextReturnMoreAvailableWhenAllSplitFetcherCloseWithLeftoverElementInQueue()
throws Exception {
- FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
elementsQueue =
- new FutureCompletingBlockingQueue<>();
MockSplitReader mockSplitReader =
MockSplitReader.newBuilder()
.setNumRecordsPerSplitPerFetch(1)
.setBlockingFetch(true)
.build();
BlockingShutdownSplitFetcherManager<int[], MockSourceSplit>
splitFetcherManager =
- new BlockingShutdownSplitFetcherManager<>(
- elementsQueue, () -> mockSplitReader, getConfig());
+ new BlockingShutdownSplitFetcherManager<>(() ->
mockSplitReader, getConfig());
final MockSourceReader sourceReader =
- new MockSourceReader(
- elementsQueue,
- splitFetcherManager,
- getConfig(),
- new TestingReaderContext());
+ new MockSourceReader(splitFetcherManager, getConfig(), new
TestingReaderContext());
// Create and add a split that only contains one record
final MockSourceSplit split = new MockSourceSplit(0, 0, 1);
@@ -273,10 +252,7 @@ public class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit>
MockSourceReader reader =
new MockSourceReader(
- new FutureCompletingBlockingQueue<>(),
- () -> mockSplitReader,
- new Configuration(),
- new TestingReaderContext());
+ () -> mockSplitReader, new Configuration(), new
TestingReaderContext());
SourceOperator<Integer, MockSourceSplit> sourceOperator =
createTestOperator(
@@ -343,8 +319,6 @@ public class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit>
void testMultipleSplitsAndFinishedByRecordEvaluator() throws Exception {
int split0End = 7;
int split1End = 15;
- FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
elementsQueue =
- new FutureCompletingBlockingQueue<>();
MockSplitReader mockSplitReader =
MockSplitReader.newBuilder()
.setNumRecordsPerSplitPerFetch(2)
@@ -353,9 +327,7 @@ public class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit>
.build();
MockSourceReader reader =
new MockSourceReader(
- elementsQueue,
- new SingleThreadFetcherManager<>(
- elementsQueue, () -> mockSplitReader,
getConfig()),
+ new SingleThreadFetcherManager<>(() ->
mockSplitReader, getConfig()),
getConfig(),
new TestingReaderContext(),
i -> i == split0End || i == split1End);
@@ -392,15 +364,12 @@ public class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit>
@Override
protected MockSourceReader createReader() {
- FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
elementsQueue =
- new FutureCompletingBlockingQueue<>();
MockSplitReader mockSplitReader =
MockSplitReader.newBuilder()
.setNumRecordsPerSplitPerFetch(2)
.setBlockingFetch(true)
.build();
- return new MockSourceReader(
- elementsQueue, () -> mockSplitReader, getConfig(), new
TestingReaderContext());
+ return new MockSourceReader(() -> mockSplitReader, getConfig(), new
TestingReaderContext());
}
@Override
@@ -446,13 +415,9 @@ public class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit>
private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable(
final String splitId, final RecordsWithSplitIds<E> records) throws
Exception {
- final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue =
- new FutureCompletingBlockingQueue<>();
-
final SourceReader<E, TestingSourceSplit> reader =
new SingleThreadMultiplexSourceReaderBase<
E, E, TestingSourceSplit, TestingSourceSplit>(
- elementsQueue,
() -> new TestingSplitReader<>(records),
new PassThroughRecordEmitter<>(),
new Configuration(),
@@ -489,6 +454,7 @@ public class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit>
}
// ------------------ Test helper classes -------------------
+
/**
* When maybeShutdownFinishedFetchers is invoke,
BlockingShutdownSplitFetcherManager will
* complete the inShutdownSplitFetcherFuture and ensures that all the
split fetchers are
@@ -500,10 +466,8 @@ public class SourceReaderBaseTest extends
SourceReaderTestBase<MockSourceSplit>
private final CompletableFuture<Void> inShutdownSplitFetcherFuture;
public BlockingShutdownSplitFetcherManager(
- FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
- Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
- Configuration configuration) {
- super(elementsQueue, splitReaderSupplier, configuration);
+ Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
Configuration configuration) {
+ super(splitReaderSupplier, configuration);
this.inShutdownSplitFetcherFuture = new CompletableFuture<>();
}
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
index 11cd62bbf4d..08e7ac45df9 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.base.source.reader.fetcher;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import
org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
@@ -61,7 +62,7 @@ public class SplitFetcherManagerTest {
TestingSplitReader<Object, TestingSourceSplit> reader = new
TestingSplitReader<>();
reader.setCloseWithException();
SplitFetcherManager<Object, TestingSourceSplit> fetcherManager =
- createFetcher("test-split", new
FutureCompletingBlockingQueue<>(), reader);
+ createFetcher("test-split", reader, new Configuration());
fetcherManager.close(1000L);
assertThatThrownBy(fetcherManager::checkErrors)
.hasRootCauseMessage("Artificial exception on closing the
split reader.");
@@ -74,21 +75,21 @@ public class SplitFetcherManagerTest {
final RecordsWithSplitIds<Integer>... fetchesBeforeError) throws
Exception {
final IOException testingException = new IOException("test");
- final FutureCompletingBlockingQueue<RecordsWithSplitIds<Integer>>
queue =
- new FutureCompletingBlockingQueue<>(10);
final AwaitingReader<Integer, TestingSourceSplit> reader =
new AwaitingReader<>(testingException, fetchesBeforeError);
+ final Configuration configuration = new Configuration();
+ configuration.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 10);
final SplitFetcherManager<Integer, TestingSourceSplit> fetcher =
- createFetcher("testSplit", queue, reader);
+ createFetcher("testSplit", reader, configuration);
reader.awaitAllRecordsReturned();
- drainQueue(queue);
+ drainQueue(fetcher.getQueue());
- assertThat(queue.getAvailabilityFuture().isDone()).isFalse();
+
assertThat(fetcher.getQueue().getAvailabilityFuture().isDone()).isFalse();
reader.triggerThrowException();
// await the error propagation
- queue.getAvailabilityFuture().get();
+ fetcher.getQueue().getAvailabilityFuture().get();
try {
fetcher.checkErrors();
@@ -106,11 +107,11 @@ public class SplitFetcherManagerTest {
private static <E> SplitFetcherManager<E, TestingSourceSplit>
createFetcher(
final String splitId,
- final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue,
- final SplitReader<E, TestingSourceSplit> reader) {
+ final SplitReader<E, TestingSourceSplit> reader,
+ final Configuration configuration) {
final SingleThreadFetcherManager<E, TestingSourceSplit> fetcher =
- new SingleThreadFetcherManager<>(queue, () -> reader, new
Configuration());
+ new SingleThreadFetcherManager<>(() -> reader, configuration);
fetcher.addSplits(Collections.singletonList(new
TestingSourceSplit(splitId)));
return fetcher;
}
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest.java
index f69fa03f9e4..6487802ae30 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest.java
@@ -22,11 +22,10 @@ import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
@@ -157,10 +156,8 @@ public class SplitFetcherPauseResumeSplitReaderTest {
extends SingleThreadFetcherManager<E, SplitT> {
public MockSteppingSplitFetcherManager(
- FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>
elementsQueue,
- Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
- Configuration configuration) {
- super(elementsQueue, splitReaderSupplier, configuration);
+ Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
Configuration configuration) {
+ super(splitReaderSupplier, configuration);
}
@Override
@@ -215,14 +212,11 @@ public class SplitFetcherPauseResumeSplitReaderTest {
public SteppingSourceReaderTestHarness(
Supplier<SplitReader<int[], MockSourceSplit>>
splitReaderSupplier,
Configuration configuration) {
- FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> queue =
- new FutureCompletingBlockingQueue<>(10);
+ configuration.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 10);
this.fetcherManager =
- new MockSteppingSplitFetcherManager<>(
- queue, splitReaderSupplier, configuration);
+ new MockSteppingSplitFetcherManager<>(splitReaderSupplier,
configuration);
this.sourceReader =
- new MockSourceReader(
- queue, fetcherManager, configuration, new
TestingReaderContext());
+ new MockSourceReader(fetcherManager, configuration, new
TestingReaderContext());
}
private static List<MockSourceSplit> createPrefilledSplits(int
numSplits, int numRecords) {
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
index 11bf88f0f37..5193b9358b5 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
@@ -27,9 +27,7 @@ import
org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
-import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.InstantiationUtil;
@@ -66,8 +64,6 @@ public class MockBaseSource implements Source<Integer,
MockSourceSplit, List<Moc
@Override
public SourceReader<Integer, MockSourceSplit>
createReader(SourceReaderContext readerContext) {
- FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
elementsQueue =
- new FutureCompletingBlockingQueue<>();
Configuration config = new Configuration();
config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
@@ -76,7 +72,7 @@ public class MockBaseSource implements Source<Integer,
MockSourceSplit, List<Moc
MockSplitReader.newBuilder()
.setNumRecordsPerSplitPerFetch(2)
.setBlockingFetch(true);
- return new MockSourceReader(elementsQueue, builder::build, config,
readerContext);
+ return new MockSourceReader(builder::build, config, readerContext);
}
@Override
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
index 79cf9a82aa2..87bc7810c31 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
@@ -22,11 +22,9 @@ import
org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import java.util.Map;
import java.util.function.Supplier;
@@ -37,25 +35,17 @@ public class MockSourceReader
int[], Integer, MockSourceSplit, MockSplitState> {
public MockSourceReader(
- FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
elementsQueue,
Supplier<SplitReader<int[], MockSourceSplit>> splitFetcherSupplier,
Configuration config,
SourceReaderContext context) {
- super(
- elementsQueue,
- splitFetcherSupplier,
- new MockRecordEmitter(context.metricGroup()),
- config,
- context);
+ super(splitFetcherSupplier, new
MockRecordEmitter(context.metricGroup()), config, context);
}
public MockSourceReader(
- FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
elementsQueue,
SingleThreadFetcherManager<int[], MockSourceSplit>
splitSplitFetcherManager,
Configuration config,
SourceReaderContext context) {
super(
- elementsQueue,
splitSplitFetcherManager,
new MockRecordEmitter(context.metricGroup()),
config,
@@ -63,13 +53,11 @@ public class MockSourceReader
}
public MockSourceReader(
- FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
elementsQueue,
SingleThreadFetcherManager<int[], MockSourceSplit>
splitSplitFetcherManager,
Configuration config,
SourceReaderContext context,
RecordEvaluator<Integer> recordEvaluator) {
super(
- elementsQueue,
splitSplitFetcherManager,
new MockRecordEmitter(context.metricGroup()),
recordEvaluator,