This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6afee1de658 [FLINK-34196][Connectors][FLIP-389] Annotate 
SingleThreadFetcherManager as PublicEvolving
6afee1de658 is described below

commit 6afee1de6585074e0df6205f1f52bb239dcf4a77
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Jan 17 17:02:19 2024 +0800

    [FLINK-34196][Connectors][FLIP-389] Annotate SingleThreadFetcherManager as 
PublicEvolving
---
 .../SingleThreadMultiplexSourceReaderBase.java     | 48 +++++++++++++--
 .../base/source/reader/SourceReaderBase.java       | 44 +++++++++++++
 .../reader/fetcher/SingleThreadFetcherManager.java | 51 +++++++++++++--
 .../base/source/reader/fetcher/SplitFetcher.java   |  4 +-
 .../source/reader/fetcher/SplitFetcherManager.java | 72 +++++++++++++++++++++-
 .../source/reader/fetcher/SplitFetcherTask.java    |  4 +-
 .../base/source/hybrid/HybridSourceReaderTest.java | 11 +---
 .../base/source/reader/SourceReaderBaseTest.java   | 56 +++--------------
 .../reader/fetcher/SplitFetcherManagerTest.java    | 21 ++++---
 .../SplitFetcherPauseResumeSplitReaderTest.java    | 18 ++----
 .../base/source/reader/mocks/MockBaseSource.java   |  6 +-
 .../base/source/reader/mocks/MockSourceReader.java | 14 +----
 12 files changed, 240 insertions(+), 109 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index 72cb5e2f8f5..f24fd3879f2 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -74,10 +74,8 @@ public abstract class SingleThreadMultiplexSourceReaderBase<
             RecordEmitter<E, T, SplitStateT> recordEmitter,
             Configuration config,
             SourceReaderContext context) {
-        this(
-                new FutureCompletingBlockingQueue<>(
-                        
config.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),
-                splitReaderSupplier,
+        super(
+                new SingleThreadFetcherManager<>(splitReaderSupplier, config),
                 recordEmitter,
                 config,
                 context);
@@ -87,7 +85,11 @@ public abstract class SingleThreadMultiplexSourceReaderBase<
      * This constructor behaves like {@link 
#SingleThreadMultiplexSourceReaderBase(Supplier,
      * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a 
specific {@link
      * FutureCompletingBlockingQueue}.
+     *
+     * @deprecated Please use {@link 
#SingleThreadMultiplexSourceReaderBase(Supplier, RecordEmitter,
+     *     Configuration, SourceReaderContext)} instead.
      */
+    @Deprecated
     public SingleThreadMultiplexSourceReaderBase(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
             Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
@@ -106,7 +108,12 @@ public abstract class 
SingleThreadMultiplexSourceReaderBase<
      * This constructor behaves like {@link 
#SingleThreadMultiplexSourceReaderBase(Supplier,
      * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a 
specific {@link
      * FutureCompletingBlockingQueue} and {@link SingleThreadFetcherManager}.
+     *
+     * @deprecated Please use {@link
+     *     #SingleThreadMultiplexSourceReaderBase(SingleThreadFetcherManager, 
RecordEmitter,
+     *     Configuration, SourceReaderContext)} instead.
      */
+    @Deprecated
     public SingleThreadMultiplexSourceReaderBase(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
             SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
@@ -121,7 +128,12 @@ public abstract class 
SingleThreadMultiplexSourceReaderBase<
      * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a 
specific {@link
      * FutureCompletingBlockingQueue}, {@link SingleThreadFetcherManager} and 
{@link
      * RecordEvaluator}.
+     *
+     * @deprecated Please use {@link
+     *     #SingleThreadMultiplexSourceReaderBase(SingleThreadFetcherManager, 
RecordEmitter,
+     *     RecordEvaluator, Configuration, SourceReaderContext)} instead.
      */
+    @Deprecated
     public SingleThreadMultiplexSourceReaderBase(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
             SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
@@ -137,4 +149,32 @@ public abstract class 
SingleThreadMultiplexSourceReaderBase<
                 config,
                 context);
     }
+
+    /**
+     * This constructor behaves like {@link 
#SingleThreadMultiplexSourceReaderBase(Supplier,
+     * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a 
specific {@link
+     * FutureCompletingBlockingQueue} and {@link SingleThreadFetcherManager}.
+     */
+    public SingleThreadMultiplexSourceReaderBase(
+            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            Configuration config,
+            SourceReaderContext context) {
+        super(splitFetcherManager, recordEmitter, config, context);
+    }
+
+    /**
+     * This constructor behaves like {@link 
#SingleThreadMultiplexSourceReaderBase(Supplier,
+     * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a 
specific {@link
+     * FutureCompletingBlockingQueue}, {@link SingleThreadFetcherManager} and 
{@link
+     * RecordEvaluator}.
+     */
+    public SingleThreadMultiplexSourceReaderBase(
+            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            @Nullable RecordEvaluator<T> eofRecordEvaluator,
+            Configuration config,
+            SourceReaderContext context) {
+        super(splitFetcherManager, recordEmitter, eofRecordEvaluator, config, 
context);
+    }
 }
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index fafec33e3af..bf14bdb6215 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -107,6 +107,11 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
 
     @Nullable protected final RecordEvaluator<T> eofRecordEvaluator;
 
+    /**
+     * @deprecated Please use {@link #SourceReaderBase(SplitFetcherManager, 
RecordEmitter,
+     *     Configuration, SourceReaderContext)} instead.
+     */
+    @Deprecated
     public SourceReaderBase(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
             SplitFetcherManager<E, SplitT> splitFetcherManager,
@@ -116,6 +121,11 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
         this(elementsQueue, splitFetcherManager, recordEmitter, null, config, 
context);
     }
 
+    /**
+     * @deprecated Please use {@link #SourceReaderBase(SplitFetcherManager, 
RecordEmitter,
+     *     RecordEvaluator, Configuration, SourceReaderContext)} instead.
+     */
+    @Deprecated
     public SourceReaderBase(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
             SplitFetcherManager<E, SplitT> splitFetcherManager,
@@ -136,6 +146,39 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
         numRecordsInCounter = 
context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
     }
 
+    /**
+     * The primary constructor for the source reader.
+     *
+     * <p>The reader will use a handover queue sized as configured via {@link
+     * SourceReaderOptions#ELEMENT_QUEUE_CAPACITY}.
+     */
+    public SourceReaderBase(
+            SplitFetcherManager<E, SplitT> splitFetcherManager,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            Configuration config,
+            SourceReaderContext context) {
+        this(splitFetcherManager, recordEmitter, null, config, context);
+    }
+
+    public SourceReaderBase(
+            SplitFetcherManager<E, SplitT> splitFetcherManager,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            @Nullable RecordEvaluator<T> eofRecordEvaluator,
+            Configuration config,
+            SourceReaderContext context) {
+        this.elementsQueue = splitFetcherManager.getQueue();
+        this.splitFetcherManager = splitFetcherManager;
+        this.recordEmitter = recordEmitter;
+        this.splitStates = new HashMap<>();
+        this.options = new SourceReaderOptions(config);
+        this.config = config;
+        this.context = context;
+        this.noMoreSplitsAssignment = false;
+        this.eofRecordEvaluator = eofRecordEvaluator;
+
+        numRecordsInCounter = 
context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
+    }
+
     @Override
     public void start() {}
 
@@ -308,6 +351,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
     }
 
     // -------------------- Abstract method to allow different implementations 
------------------
+
     /** Handles the finished splits to clean the state if needed. */
     protected abstract void onSplitFinished(Map<String, SplitStateT> 
finishedSplitIds);
 
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
index a3ae181da8d..391885a6fbe 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader.fetcher;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.configuration.Configuration;
@@ -41,7 +41,7 @@ import java.util.function.Supplier;
  * via the same client. In the example of the file source, there is a single 
thread that reads the
  * files after another.
  */
-@Internal
+@PublicEvolving
 public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
         extends SplitFetcherManager<E, SplitT> {
 
@@ -53,14 +53,13 @@ public class SingleThreadFetcherManager<E, SplitT extends 
SourceSplit>
      *     the same queue instance that is also passed to the {@link 
SourceReaderBase}.
      * @param splitReaderSupplier The factory for the split reader that 
connects to the source
      *     system.
-     * @deprecated Please use {@link 
#SingleThreadFetcherManager(FutureCompletingBlockingQueue,
-     *     Supplier, Configuration)} instead.
+     * @deprecated Please use {@link #SingleThreadFetcherManager(Supplier, 
Configuration)} instead.
      */
     @Deprecated
     public SingleThreadFetcherManager(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
             Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
-        this(elementsQueue, splitReaderSupplier, new Configuration());
+        super(elementsQueue, splitReaderSupplier, new Configuration());
     }
 
     /**
@@ -72,7 +71,9 @@ public class SingleThreadFetcherManager<E, SplitT extends 
SourceSplit>
      * @param splitReaderSupplier The factory for the split reader that 
connects to the source
      *     system.
      * @param configuration The configuration to create the fetcher manager.
+     * @deprecated Please use {@link #SingleThreadFetcherManager(Supplier, 
Configuration)} instead.
      */
+    @Deprecated
     public SingleThreadFetcherManager(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
             Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
@@ -90,8 +91,11 @@ public class SingleThreadFetcherManager<E, SplitT extends 
SourceSplit>
      *     system.
      * @param configuration The configuration to create the fetcher manager.
      * @param splitFinishedHook Hook for handling finished splits in split 
fetchers
+     * @deprecated Please use {@link #SingleThreadFetcherManager(Supplier, 
Configuration, Consumer)}
+     *     instead.
      */
     @VisibleForTesting
+    @Deprecated
     public SingleThreadFetcherManager(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
             Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
@@ -100,6 +104,43 @@ public class SingleThreadFetcherManager<E, SplitT extends 
SourceSplit>
         super(elementsQueue, splitReaderSupplier, configuration, 
splitFinishedHook);
     }
 
+    /**
+     * Creates a new SplitFetcherManager with a single I/O threads.
+     *
+     * @param splitReaderSupplier The factory for the split reader that 
connects to the source
+     *     system.
+     */
+    public SingleThreadFetcherManager(Supplier<SplitReader<E, SplitT>> 
splitReaderSupplier) {
+        super(splitReaderSupplier, new Configuration());
+    }
+
+    /**
+     * Creates a new SplitFetcherManager with a single I/O threads.
+     *
+     * @param splitReaderSupplier The factory for the split reader that 
connects to the source
+     *     system.
+     * @param configuration The configuration to create the fetcher manager.
+     */
+    public SingleThreadFetcherManager(
+            Supplier<SplitReader<E, SplitT>> splitReaderSupplier, 
Configuration configuration) {
+        super(splitReaderSupplier, configuration);
+    }
+
+    /**
+     * Creates a new SplitFetcherManager with a single I/O threads.
+     *
+     * @param splitReaderSupplier The factory for the split reader that 
connects to the source
+     *     system.
+     * @param configuration The configuration to create the fetcher manager.
+     * @param splitFinishedHook Hook for handling finished splits in split 
fetchers
+     */
+    public SingleThreadFetcherManager(
+            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+            Configuration configuration,
+            Consumer<Collection<String>> splitFinishedHook) {
+        super(splitReaderSupplier, configuration, splitFinishedHook);
+    }
+
     @Override
     public void addSplits(List<SplitT> splitsToAdd) {
         SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 5e6cca3ca9e..339686415ee 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader.fetcher;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -43,7 +43,7 @@ import java.util.function.Consumer;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** The internal fetcher runnable responsible for polling message from the 
external system. */
-@Internal
+@PublicEvolving
 public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
     private static final Logger LOG = 
LoggerFactory.getLogger(SplitFetcher.class);
 
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 594c3d168af..4f7219eb2b8 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -19,11 +19,13 @@
 package org.apache.flink.connector.base.source.reader.fetcher;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderBase;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
@@ -55,7 +57,7 @@ import static 
org.apache.flink.configuration.PipelineOptions.ALLOW_UNALIGNED_SOU
  * manager would only start a single fetcher and assign all the splits to it. 
A one-thread-per-split
  * fetcher may spawn a new thread every time a new split is assigned.
  */
-@Internal
+@PublicEvolving
 public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
     private static final Logger LOG = 
LoggerFactory.getLogger(SplitFetcherManager.class);
 
@@ -99,7 +101,9 @@ public abstract class SplitFetcherManager<E, SplitT extends 
SourceSplit> {
      * @param elementsQueue the queue that split readers will put elements 
into.
      * @param splitReaderFactory a supplier that could be used to create split 
readers.
      * @param configuration the configuration of this fetcher manager.
+     * @deprecated Please use {@link #SplitFetcherManager(Supplier, 
Configuration)} instead.
      */
+    @Deprecated
     public SplitFetcherManager(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
             Supplier<SplitReader<E, SplitT>> splitReaderFactory,
@@ -114,7 +118,10 @@ public abstract class SplitFetcherManager<E, SplitT 
extends SourceSplit> {
      * @param splitReaderFactory a supplier that could be used to create split 
readers.
      * @param configuration the configuration of this fetcher manager.
      * @param splitFinishedHook Hook for handling finished splits in split 
fetchers.
+     * @deprecated Please use {@link #SplitFetcherManager(Supplier, 
Configuration, Consumer)}
+     *     instead.
      */
+    @Deprecated
     @VisibleForTesting
     public SplitFetcherManager(
             FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
@@ -151,6 +158,60 @@ public abstract class SplitFetcherManager<E, SplitT 
extends SourceSplit> {
         this.closed = false;
     }
 
+    /**
+     * Create a split fetcher manager.
+     *
+     * @param splitReaderFactory a supplier that could be used to create split 
readers.
+     * @param configuration the configuration of this fetcher manager.
+     */
+    public SplitFetcherManager(
+            Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration 
configuration) {
+        this(splitReaderFactory, configuration, (ignore) -> {});
+    }
+
+    /**
+     * Create a split fetcher manager.
+     *
+     * @param splitReaderFactory a supplier that could be used to create split 
readers.
+     * @param configuration the configuration of this fetcher manager.
+     * @param splitFinishedHook Hook for handling finished splits in split 
fetchers.
+     */
+    public SplitFetcherManager(
+            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
+            Configuration configuration,
+            Consumer<Collection<String>> splitFinishedHook) {
+        this.elementsQueue =
+                new FutureCompletingBlockingQueue<>(
+                        
configuration.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
+        this.errorHandler =
+                new Consumer<Throwable>() {
+                    @Override
+                    public void accept(Throwable t) {
+                        LOG.error("Received uncaught exception.", t);
+                        if (!uncaughtFetcherException.compareAndSet(null, t)) {
+                            // Add the exception to the exception list.
+                            uncaughtFetcherException.get().addSuppressed(t);
+                        }
+                        // Wake up the main thread to let it know the 
exception.
+                        elementsQueue.notifyAvailable();
+                    }
+                };
+        this.splitReaderFactory = splitReaderFactory;
+        this.splitFinishedHook = splitFinishedHook;
+        this.uncaughtFetcherException = new AtomicReference<>(null);
+        this.fetcherIdGenerator = new AtomicInteger(0);
+        this.fetchers = new ConcurrentHashMap<>();
+        this.allowUnalignedSourceSplits = 
configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS);
+
+        // Create the executor with a thread factory that fails the source 
reader if one of
+        // the fetcher thread exits abnormally.
+        final String taskThreadName = Thread.currentThread().getName();
+        this.executors =
+                Executors.newCachedThreadPool(
+                        r -> new Thread(r, "Source Data Fetcher for " + 
taskThreadName));
+        this.closed = false;
+    }
+
     public abstract void addSplits(List<SplitT> splitsToAdd);
 
     public abstract void removeSplits(List<SplitT> splitsToRemove);
@@ -238,6 +299,15 @@ public abstract class SplitFetcherManager<E, SplitT 
extends SourceSplit> {
         return fetchers.isEmpty();
     }
 
+    /**
+     * Return the queue contains data produced by split fetchers.This method 
is Internal and only
+     * used in {@link SourceReaderBase}.
+     */
+    @Internal
+    public FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> getQueue() {
+        return elementsQueue;
+    }
+
     /**
      * Close the split fetcher manager.
      *
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
index 62a21d3c3f1..3a95c5b1a87 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.connector.base.source.reader.fetcher;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 
 import java.io.IOException;
 
 /** An interface similar to {@link Runnable} but allows throwing exceptions 
and wakeup. */
-@Internal
+@PublicEvolving
 public interface SplitFetcherTask {
 
     /**
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
index 1853eb4439a..b7fef166a65 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java
@@ -25,13 +25,11 @@ import 
org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
 import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
 import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
 import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
 import org.apache.flink.core.io.InputStatus;
@@ -330,17 +328,13 @@ public class HybridSourceReaderTest {
         private CompletableFuture<Void> availabilityFuture = new 
CompletableFuture<>();
 
         public MutableFutureSourceReader(
-                FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue,
                 Supplier<SplitReader<int[], MockSourceSplit>> 
splitFetcherSupplier,
                 Configuration config,
                 SourceReaderContext context) {
-            super(elementsQueue, splitFetcherSupplier, config, context);
+            super(splitFetcherSupplier, config, context);
         }
 
         public static MutableFutureSourceReader 
createReader(SourceReaderContext readerContext) {
-            FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
-                    new FutureCompletingBlockingQueue<>();
-
             Configuration config = new Configuration();
             config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 2);
             config.set(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 
30000L);
@@ -348,8 +342,7 @@ public class HybridSourceReaderTest {
                     MockSplitReader.newBuilder()
                             .setNumRecordsPerSplitPerFetch(2)
                             .setBlockingFetch(true);
-            return new MutableFutureSourceReader(
-                    elementsQueue, builder::build, config, readerContext);
+            return new MutableFutureSourceReader(builder::build, config, 
readerContext);
         }
 
         @Override
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 9096bac02ac..1cac47c9dec 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -36,7 +36,6 @@ import 
org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
 import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
@@ -82,13 +81,10 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                         () -> {
                             final String errMsg = "Testing Exception";
 
-                            
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>
-                                    elementsQueue = new 
FutureCompletingBlockingQueue<>();
                             // We have to handle split changes first, 
otherwise fetch will not be
                             // called.
                             try (MockSourceReader reader =
                                     new MockSourceReader(
-                                            elementsQueue,
                                             () ->
                                                     new SplitReader<int[], 
MockSourceSplit>() {
                                                         @Override
@@ -160,8 +156,6 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
 
     @Test
     void testMultipleSplitsWithDifferentFinishingMoments() throws Exception {
-        FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
-                new FutureCompletingBlockingQueue<>();
         MockSplitReader mockSplitReader =
                 MockSplitReader.newBuilder()
                         .setNumRecordsPerSplitPerFetch(2)
@@ -170,10 +164,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                         .build();
         MockSourceReader reader =
                 new MockSourceReader(
-                        elementsQueue,
-                        () -> mockSplitReader,
-                        getConfig(),
-                        new TestingReaderContext());
+                        () -> mockSplitReader, getConfig(), new 
TestingReaderContext());
 
         reader.start();
 
@@ -196,8 +187,6 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
 
     @Test
     void testMultipleSplitsWithSeparatedFinishedRecord() throws Exception {
-        FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
-                new FutureCompletingBlockingQueue<>();
         MockSplitReader mockSplitReader =
                 MockSplitReader.newBuilder()
                         .setNumRecordsPerSplitPerFetch(2)
@@ -206,10 +195,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                         .build();
         MockSourceReader reader =
                 new MockSourceReader(
-                        elementsQueue,
-                        () -> mockSplitReader,
-                        getConfig(),
-                        new TestingReaderContext());
+                        () -> mockSplitReader, getConfig(), new 
TestingReaderContext());
 
         reader.start();
 
@@ -234,22 +220,15 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
     void 
testPollNextReturnMoreAvailableWhenAllSplitFetcherCloseWithLeftoverElementInQueue()
             throws Exception {
 
-        FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
-                new FutureCompletingBlockingQueue<>();
         MockSplitReader mockSplitReader =
                 MockSplitReader.newBuilder()
                         .setNumRecordsPerSplitPerFetch(1)
                         .setBlockingFetch(true)
                         .build();
         BlockingShutdownSplitFetcherManager<int[], MockSourceSplit> 
splitFetcherManager =
-                new BlockingShutdownSplitFetcherManager<>(
-                        elementsQueue, () -> mockSplitReader, getConfig());
+                new BlockingShutdownSplitFetcherManager<>(() -> 
mockSplitReader, getConfig());
         final MockSourceReader sourceReader =
-                new MockSourceReader(
-                        elementsQueue,
-                        splitFetcherManager,
-                        getConfig(),
-                        new TestingReaderContext());
+                new MockSourceReader(splitFetcherManager, getConfig(), new 
TestingReaderContext());
 
         // Create and add a split that only contains one record
         final MockSourceSplit split = new MockSourceSplit(0, 0, 1);
@@ -273,10 +252,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
 
         MockSourceReader reader =
                 new MockSourceReader(
-                        new FutureCompletingBlockingQueue<>(),
-                        () -> mockSplitReader,
-                        new Configuration(),
-                        new TestingReaderContext());
+                        () -> mockSplitReader, new Configuration(), new 
TestingReaderContext());
 
         SourceOperator<Integer, MockSourceSplit> sourceOperator =
                 createTestOperator(
@@ -343,8 +319,6 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
     void testMultipleSplitsAndFinishedByRecordEvaluator() throws Exception {
         int split0End = 7;
         int split1End = 15;
-        FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
-                new FutureCompletingBlockingQueue<>();
         MockSplitReader mockSplitReader =
                 MockSplitReader.newBuilder()
                         .setNumRecordsPerSplitPerFetch(2)
@@ -353,9 +327,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                         .build();
         MockSourceReader reader =
                 new MockSourceReader(
-                        elementsQueue,
-                        new SingleThreadFetcherManager<>(
-                                elementsQueue, () -> mockSplitReader, 
getConfig()),
+                        new SingleThreadFetcherManager<>(() -> 
mockSplitReader, getConfig()),
                         getConfig(),
                         new TestingReaderContext(),
                         i -> i == split0End || i == split1End);
@@ -392,15 +364,12 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
 
     @Override
     protected MockSourceReader createReader() {
-        FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
-                new FutureCompletingBlockingQueue<>();
         MockSplitReader mockSplitReader =
                 MockSplitReader.newBuilder()
                         .setNumRecordsPerSplitPerFetch(2)
                         .setBlockingFetch(true)
                         .build();
-        return new MockSourceReader(
-                elementsQueue, () -> mockSplitReader, getConfig(), new 
TestingReaderContext());
+        return new MockSourceReader(() -> mockSplitReader, getConfig(), new 
TestingReaderContext());
     }
 
     @Override
@@ -446,13 +415,9 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
     private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable(
             final String splitId, final RecordsWithSplitIds<E> records) throws 
Exception {
 
-        final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue =
-                new FutureCompletingBlockingQueue<>();
-
         final SourceReader<E, TestingSourceSplit> reader =
                 new SingleThreadMultiplexSourceReaderBase<
                         E, E, TestingSourceSplit, TestingSourceSplit>(
-                        elementsQueue,
                         () -> new TestingSplitReader<>(records),
                         new PassThroughRecordEmitter<>(),
                         new Configuration(),
@@ -489,6 +454,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
     }
 
     // ------------------ Test helper classes -------------------
+
     /**
      * When maybeShutdownFinishedFetchers is invoke, 
BlockingShutdownSplitFetcherManager will
      * complete the inShutdownSplitFetcherFuture and ensures that all the 
split fetchers are
@@ -500,10 +466,8 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
         private final CompletableFuture<Void> inShutdownSplitFetcherFuture;
 
         public BlockingShutdownSplitFetcherManager(
-                FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
-                Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
-                Configuration configuration) {
-            super(elementsQueue, splitReaderSupplier, configuration);
+                Supplier<SplitReader<E, SplitT>> splitReaderSupplier, 
Configuration configuration) {
+            super(splitReaderSupplier, configuration);
             this.inShutdownSplitFetcherFuture = new CompletableFuture<>();
         }
 
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
index 11cd62bbf4d..08e7ac45df9 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.base.source.reader.fetcher;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
 import 
org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
 import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
@@ -61,7 +62,7 @@ public class SplitFetcherManagerTest {
         TestingSplitReader<Object, TestingSourceSplit> reader = new 
TestingSplitReader<>();
         reader.setCloseWithException();
         SplitFetcherManager<Object, TestingSourceSplit> fetcherManager =
-                createFetcher("test-split", new 
FutureCompletingBlockingQueue<>(), reader);
+                createFetcher("test-split", reader, new Configuration());
         fetcherManager.close(1000L);
         assertThatThrownBy(fetcherManager::checkErrors)
                 .hasRootCauseMessage("Artificial exception on closing the 
split reader.");
@@ -74,21 +75,21 @@ public class SplitFetcherManagerTest {
             final RecordsWithSplitIds<Integer>... fetchesBeforeError) throws 
Exception {
         final IOException testingException = new IOException("test");
 
-        final FutureCompletingBlockingQueue<RecordsWithSplitIds<Integer>> 
queue =
-                new FutureCompletingBlockingQueue<>(10);
         final AwaitingReader<Integer, TestingSourceSplit> reader =
                 new AwaitingReader<>(testingException, fetchesBeforeError);
+        final Configuration configuration = new Configuration();
+        configuration.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 10);
         final SplitFetcherManager<Integer, TestingSourceSplit> fetcher =
-                createFetcher("testSplit", queue, reader);
+                createFetcher("testSplit", reader, configuration);
 
         reader.awaitAllRecordsReturned();
-        drainQueue(queue);
+        drainQueue(fetcher.getQueue());
 
-        assertThat(queue.getAvailabilityFuture().isDone()).isFalse();
+        
assertThat(fetcher.getQueue().getAvailabilityFuture().isDone()).isFalse();
         reader.triggerThrowException();
 
         // await the error propagation
-        queue.getAvailabilityFuture().get();
+        fetcher.getQueue().getAvailabilityFuture().get();
 
         try {
             fetcher.checkErrors();
@@ -106,11 +107,11 @@ public class SplitFetcherManagerTest {
 
     private static <E> SplitFetcherManager<E, TestingSourceSplit> 
createFetcher(
             final String splitId,
-            final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue,
-            final SplitReader<E, TestingSourceSplit> reader) {
+            final SplitReader<E, TestingSourceSplit> reader,
+            final Configuration configuration) {
 
         final SingleThreadFetcherManager<E, TestingSourceSplit> fetcher =
-                new SingleThreadFetcherManager<>(queue, () -> reader, new 
Configuration());
+                new SingleThreadFetcherManager<>(() -> reader, configuration);
         fetcher.addSplits(Collections.singletonList(new 
TestingSourceSplit(splitId)));
         return fetcher;
     }
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest.java
index f69fa03f9e4..6487802ae30 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest.java
@@ -22,11 +22,10 @@ import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
 import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
 import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
 import org.apache.flink.core.io.InputStatus;
@@ -157,10 +156,8 @@ public class SplitFetcherPauseResumeSplitReaderTest {
             extends SingleThreadFetcherManager<E, SplitT> {
 
         public MockSteppingSplitFetcherManager(
-                FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
-                Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
-                Configuration configuration) {
-            super(elementsQueue, splitReaderSupplier, configuration);
+                Supplier<SplitReader<E, SplitT>> splitReaderSupplier, 
Configuration configuration) {
+            super(splitReaderSupplier, configuration);
         }
 
         @Override
@@ -215,14 +212,11 @@ public class SplitFetcherPauseResumeSplitReaderTest {
         public SteppingSourceReaderTestHarness(
                 Supplier<SplitReader<int[], MockSourceSplit>> 
splitReaderSupplier,
                 Configuration configuration) {
-            FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> queue =
-                    new FutureCompletingBlockingQueue<>(10);
+            configuration.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 10);
             this.fetcherManager =
-                    new MockSteppingSplitFetcherManager<>(
-                            queue, splitReaderSupplier, configuration);
+                    new MockSteppingSplitFetcherManager<>(splitReaderSupplier, 
configuration);
             this.sourceReader =
-                    new MockSourceReader(
-                            queue, fetcherManager, configuration, new 
TestingReaderContext());
+                    new MockSourceReader(fetcherManager, configuration, new 
TestingReaderContext());
         }
 
         private static List<MockSourceSplit> createPrefilledSplits(int 
numSplits, int numRecords) {
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
index 11bf88f0f37..5193b9358b5 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
@@ -27,9 +27,7 @@ import 
org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -66,8 +64,6 @@ public class MockBaseSource implements Source<Integer, 
MockSourceSplit, List<Moc
 
     @Override
     public SourceReader<Integer, MockSourceSplit> 
createReader(SourceReaderContext readerContext) {
-        FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue =
-                new FutureCompletingBlockingQueue<>();
 
         Configuration config = new Configuration();
         config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
@@ -76,7 +72,7 @@ public class MockBaseSource implements Source<Integer, 
MockSourceSplit, List<Moc
                 MockSplitReader.newBuilder()
                         .setNumRecordsPerSplitPerFetch(2)
                         .setBlockingFetch(true);
-        return new MockSourceReader(elementsQueue, builder::build, config, 
readerContext);
+        return new MockSourceReader(builder::build, config, readerContext);
     }
 
     @Override
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
index 79cf9a82aa2..87bc7810c31 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
@@ -22,11 +22,9 @@ import 
org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordEvaluator;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import 
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
 import java.util.Map;
 import java.util.function.Supplier;
@@ -37,25 +35,17 @@ public class MockSourceReader
                 int[], Integer, MockSourceSplit, MockSplitState> {
 
     public MockSourceReader(
-            FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue,
             Supplier<SplitReader<int[], MockSourceSplit>> splitFetcherSupplier,
             Configuration config,
             SourceReaderContext context) {
-        super(
-                elementsQueue,
-                splitFetcherSupplier,
-                new MockRecordEmitter(context.metricGroup()),
-                config,
-                context);
+        super(splitFetcherSupplier, new 
MockRecordEmitter(context.metricGroup()), config, context);
     }
 
     public MockSourceReader(
-            FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue,
             SingleThreadFetcherManager<int[], MockSourceSplit> 
splitSplitFetcherManager,
             Configuration config,
             SourceReaderContext context) {
         super(
-                elementsQueue,
                 splitSplitFetcherManager,
                 new MockRecordEmitter(context.metricGroup()),
                 config,
@@ -63,13 +53,11 @@ public class MockSourceReader
     }
 
     public MockSourceReader(
-            FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> 
elementsQueue,
             SingleThreadFetcherManager<int[], MockSourceSplit> 
splitSplitFetcherManager,
             Configuration config,
             SourceReaderContext context,
             RecordEvaluator<Integer> recordEvaluator) {
         super(
-                elementsQueue,
                 splitSplitFetcherManager,
                 new MockRecordEmitter(context.metricGroup()),
                 recordEvaluator,

Reply via email to