This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f4e33b591 [feature][connector][common] Add  
`SingleThreadMultiplexSourceReaderBase (#3335)
f4e33b591 is described below

commit f4e33b591278fac52e025ed0e9c4e7a9cb5acfd4
Author: hailin0 <[email protected]>
AuthorDate: Fri Nov 11 10:13:19 2022 +0800

    [feature][connector][common] Add  `SingleThreadMultiplexSourceReaderBase 
(#3335)
    
    * [feature][connector][common] Add  `SingleThreadMultiplexSourceReaderBase`
    
    * [translation][spark] Adapt to spark micro batch
---
 LICENSE                                            |   1 +
 .../common/source/reader/RecordEmitter.java        |  47 +++++
 .../common/source/reader/RecordsBySplits.java      |  62 ++++++
 .../common/source/reader/RecordsWithSplitIds.java  |  52 +++++
 .../SingleThreadMultiplexSourceReaderBase.java     |  74 +++++++
 .../common/source/reader/SourceReaderBase.java     | 235 +++++++++++++++++++++
 .../common/source/reader/SourceReaderOptions.java  |  54 +++++
 .../source/reader/fetcher/AddSplitsTask.java       |  48 +++++
 .../common/source/reader/fetcher/FetchTask.java    |  91 ++++++++
 .../reader/fetcher/SingleThreadFetcherManager.java |  67 ++++++
 .../common/source/reader/fetcher/SplitFetcher.java | 220 +++++++++++++++++++
 .../source/reader/fetcher/SplitFetcherManager.java | 146 +++++++++++++
 .../source/reader/fetcher/SplitFetcherTask.java    |  34 +++
 .../source/reader/splitreader/SplitReader.java     |  63 ++++++
 .../source/reader/splitreader/SplitsAddition.java  |  31 +++
 .../source/reader/splitreader/SplitsChange.java    |  32 +++
 .../spark/common/InternalRowCollector.java         |   9 +
 .../source/batch/ParallelBatchPartitionReader.java |   5 +-
 .../CoordinatedMicroBatchPartitionReader.java      |  15 +-
 .../micro/ParallelMicroBatchPartitionReader.java   |  32 ++-
 20 files changed, 1306 insertions(+), 12 deletions(-)

diff --git a/LICENSE b/LICENSE
index 2927f5026..5d51f440a 100644
--- a/LICENSE
+++ b/LICENSE
@@ -213,6 +213,7 @@ 
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/RowKind.java fro
 
seatunnel-api/src/main/java/org/apache/seatunnel/api/state/CheckpointListener.java
 from https://github.com/apache/flink
 
seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/
    from  https://github.com/lightbend/config
 
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/
  from https://github.com/apache/flink
+seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/
  from https://github.com/apache/flink
 
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/
   from https://github.com/apache/iceberg
 generate_client_protocol.sh                                                    
                                                             from 
https://github.com/hazelcast/hazelcast
 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java
                          from https://github.com/hazelcast/hazelcast
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordEmitter.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordEmitter.java
new file mode 100644
index 000000000..9d2313f87
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordEmitter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+
+/**
+ * Emit a record to the downstream.
+ *
+ * @param <E>
+ *
+ * @param <T>
+ *
+ * @param <SplitStateT>
+ *
+ */
+public interface RecordEmitter<E, T, SplitStateT> {
+
+    /**
+     * Process and emit the records to the {@link Collector}.
+     *
+     * @param element
+     *
+     * @param collector
+     *
+     * @param splitState
+     *
+     * @throws Exception
+     *
+     */
+    void emitRecord(E element, Collector<T> collector, SplitStateT splitState) 
throws Exception;
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordsBySplits.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordsBySplits.java
new file mode 100644
index 000000000..82d0807ae
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordsBySplits.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
+
+    private final Set<String> finishedSplits;
+    private final Iterator<Map.Entry<String, Collection<E>>> splitsIterator;
+    private Iterator<E> recordsInCurrentSplit;
+
+    public RecordsBySplits(Map<String, Collection<E>> recordsBySplit,
+                           Set<String> finishedSplits) {
+        this.splitsIterator = checkNotNull(recordsBySplit, 
"recordsBySplit").entrySet().iterator();
+        this.finishedSplits = checkNotNull(finishedSplits, "finishedSplits");
+    }
+
+    @Override
+    public String nextSplit() {
+        if (splitsIterator.hasNext()) {
+            Map.Entry<String, Collection<E>> next = splitsIterator.next();
+            recordsInCurrentSplit = next.getValue().iterator();
+            return next.getKey();
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public E nextRecordFromSplit() {
+        if (recordsInCurrentSplit == null) {
+            throw new IllegalStateException();
+        }
+        return recordsInCurrentSplit.hasNext() ? recordsInCurrentSplit.next() 
: null;
+    }
+
+    @Override
+    public Set<String> finishedSplits() {
+        return finishedSplits;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordsWithSplitIds.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordsWithSplitIds.java
new file mode 100644
index 000000000..10cd11b5b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/RecordsWithSplitIds.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader;
+
+import java.util.Set;
+
+/**
+ * An interface for the elements passed from the fetchers to the source reader.
+ *
+ * @param <E>
+ *
+ */
+public interface RecordsWithSplitIds<E> {
+
+    /**
+     * Moves to the next split.
+     *
+     * @return Returns null, if no splits are left.
+     */
+    String nextSplit();
+
+    /**
+     * Gets the next record from the current split.
+     *
+     * @return Returns null if no more records are left in this split.
+     */
+    E nextRecordFromSplit();
+
+    /**
+     * Get the finished splits.
+     *
+     * @return
+     */
+    Set<String> finishedSplits();
+
+    default void recycle() {}
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SingleThreadMultiplexSourceReaderBase.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SingleThreadMultiplexSourceReaderBase.java
new file mode 100644
index 000000000..3f670458c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader;
+
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Supplier;
+
+/**
+ * A base for {@link SourceReader}s that read splits with one thread using one 
{@link SplitReader}.
+ *
+ * @param <E> The type of the records (the raw type that typically contains 
checkpointing information).
+ *
+ * @param <T> The final type of the records emitted by the source.
+ *
+ * @param <SplitT>
+ *
+ * @param <SplitStateT>
+ *
+ */
+public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitStateT>
+    extends SourceReaderBase<E, T, SplitT, SplitStateT> {
+
+    public SingleThreadMultiplexSourceReaderBase(Supplier<SplitReader<E, 
SplitT>> splitReaderSupplier,
+                                                 RecordEmitter<E, T, 
SplitStateT> recordEmitter,
+                                                 SourceReaderOptions options,
+                                                 SourceReader.Context context) 
{
+        this(new ArrayBlockingQueue<>(options.getElementQueueCapacity()),
+            splitReaderSupplier,
+            recordEmitter,
+            options,
+            context);
+    }
+
+    public 
SingleThreadMultiplexSourceReaderBase(BlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+                                                 Supplier<SplitReader<E, 
SplitT>> splitReaderSupplier,
+                                                 RecordEmitter<E, T, 
SplitStateT> recordEmitter,
+                                                 SourceReaderOptions options,
+                                                 SourceReader.Context context) 
{
+        super(elementsQueue,
+            new SingleThreadFetcherManager<>(elementsQueue, 
splitReaderSupplier),
+            recordEmitter,
+            options,
+            context);
+    }
+
+    public 
SingleThreadMultiplexSourceReaderBase(BlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+                                                 SingleThreadFetcherManager<E, 
SplitT> splitFetcherManager,
+                                                 RecordEmitter<E, T, 
SplitStateT> recordEmitter,
+                                                 SourceReaderOptions options,
+                                                 SourceReader.Context context) 
{
+        super(elementsQueue, splitFetcherManager, recordEmitter, options, 
context);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
new file mode 100644
index 000000000..9c81c9b73
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderBase.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * An abstract implementation of {@link SourceReader} which provides some 
synchronization between
+ * the mail box main thread and the SourceReader internal threads. This class 
allows user to just
+ * provide a {@link SplitReader} and snapshot the split state.
+ *
+ * @param <E>           The type of the records (the raw type that typically 
contains checkpointing information).
+ *
+ * @param <T>           The final type of the records emitted by the source.
+ *
+ * @param <SplitT>
+ *
+ * @param <SplitStateT>
+ *
+ */
+@Slf4j
+public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, 
SplitStateT>
+    implements SourceReader<T, SplitT> {
+    private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+    private final Map<String, SplitContext<T, SplitStateT>> splitStates;
+    protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
+    protected final SplitFetcherManager<E, SplitT> splitFetcherManager;
+    protected final SourceReaderOptions options;
+    protected final SourceReader.Context context;
+
+    private RecordsWithSplitIds<E> currentFetch;
+    private SplitContext<T, SplitStateT> currentSplitContext;
+    private Collector<T> currentSplitOutput;
+    private boolean noMoreSplitsAssignment;
+
+    public SourceReaderBase(BlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+                            SplitFetcherManager<E, SplitT> splitFetcherManager,
+                            RecordEmitter<E, T, SplitStateT> recordEmitter,
+                            SourceReaderOptions options,
+                            SourceReader.Context context) {
+        this.elementsQueue = elementsQueue;
+        this.splitFetcherManager = splitFetcherManager;
+        this.recordEmitter = recordEmitter;
+        this.splitStates = new HashMap<>();
+        this.options = options;
+        this.context = context;
+    }
+
+    @Override
+    public void open() {
+        log.info("Open Source Reader.");
+    }
+
+    @Override
+    public void pollNext(Collector<T> output) throws Exception {
+        RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
+        if (recordsWithSplitId == null) {
+            recordsWithSplitId = getNextFetch(output);
+            if (recordsWithSplitId == null) {
+                if (Boundedness.BOUNDED.equals(context.getBoundedness())
+                    && noMoreSplitsAssignment
+                    && splitFetcherManager.maybeShutdownFinishedFetchers()
+                    && elementsQueue.isEmpty()) {
+                    context.signalNoMoreElement();
+                    log.info("Send NoMoreElement event");
+                }
+                return;
+            }
+        }
+
+        E record = recordsWithSplitId.nextRecordFromSplit();
+        if (record != null) {
+            synchronized (output.getCheckpointLock()) {
+                recordEmitter.emitRecord(record, currentSplitOutput, 
currentSplitContext.state);
+            }
+            log.trace("Emitted record: {}", record);
+        } else if (!moveToNextSplit(recordsWithSplitId, output)) {
+            pollNext(output);
+        }
+    }
+
+    @Override
+    public List<SplitT> snapshotState(long checkpointId) {
+        List<SplitT> splits = new ArrayList<>();
+        splitStates.forEach((id, context) -> splits.add(toSplitType(id, 
context.state)));
+        log.info("Snapshot state from splits: {}", splits);
+        return splits;
+    }
+
+    @Override
+    public void addSplits(List<SplitT> splits) {
+        log.info("Adding split(s) to reader: {}", splits);
+        splits.forEach(split -> {
+            // Initialize the state for each split.
+            splitStates.put(split.splitId(), new 
SplitContext<>(split.splitId(), initializedState(split)));
+        });
+        splitFetcherManager.addSplits(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        log.info("Reader received NoMoreSplits event.");
+        noMoreSplitsAssignment = true;
+    }
+
+    @Override
+    public void handleSourceEvent(SourceEvent sourceEvent) {
+        log.info("Received unhandled source event: {}", sourceEvent);
+    }
+
+    @Override
+    public void close() {
+        log.info("Closing Source Reader.");
+        try {
+            splitFetcherManager.close(options.getSourceReaderCloseTimeout());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private RecordsWithSplitIds<E> getNextFetch(Collector<T> output) {
+        splitFetcherManager.checkErrors();
+        RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll();
+        if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, 
output)) {
+            log.info("Current fetch is finished.");
+            return null;
+        }
+
+        currentFetch = recordsWithSplitId;
+        return recordsWithSplitId;
+    }
+
+    private boolean moveToNextSplit(RecordsWithSplitIds<E> 
recordsWithSplitIds, Collector<T> output) {
+        final String nextSplitId = recordsWithSplitIds.nextSplit();
+        if (nextSplitId == null) {
+            log.trace("Current fetch is finished.");
+            finishCurrentFetch(recordsWithSplitIds, output);
+            return false;
+        }
+
+        currentSplitContext = splitStates.get(nextSplitId);
+        checkState(currentSplitContext != null, "Have records for a split that 
was not registered");
+        currentSplitOutput = 
currentSplitContext.getOrCreateSplitOutput(output);
+        log.trace("Emitting records from fetch for split {}", nextSplitId);
+        return true;
+    }
+
+    private void finishCurrentFetch(final RecordsWithSplitIds<E> fetch, final 
Collector<T> output) {
+        currentFetch = null;
+        currentSplitContext = null;
+        currentSplitOutput = null;
+
+        Set<String> finishedSplits = fetch.finishedSplits();
+        if (!finishedSplits.isEmpty()) {
+            log.info("Finished reading split(s) {}", finishedSplits);
+            Map<String, SplitStateT> stateOfFinishedSplits = new HashMap<>();
+            for (String finishedSplitId : finishedSplits) {
+                stateOfFinishedSplits.put(finishedSplitId, 
splitStates.remove(finishedSplitId).state);
+            }
+            onSplitFinished(stateOfFinishedSplits);
+        }
+
+        fetch.recycle();
+    }
+
+    /**
+     * Handles the finished splits to clean the state if needed.
+     *
+     * @param finishedSplitIds
+     *
+     */
+    protected abstract void onSplitFinished(Map<String, SplitStateT> 
finishedSplitIds);
+
+    /**
+     * When new splits are added to the reader. The initialize the state of 
the new splits.
+     *
+     * @param split a newly added split.
+     */
+    protected abstract SplitStateT initializedState(SplitT split);
+
+    /**
+     * Convert a mutable SplitStateT to immutable SplitT.
+     *
+     * @param splitState splitState.
+     * @return an immutable Split state.
+     */
+    protected abstract SplitT toSplitType(String splitId, SplitStateT 
splitState);
+
+    @RequiredArgsConstructor
+    private static final class SplitContext<T, SplitStateT> {
+        final String splitId;
+        final SplitStateT state;
+        Collector<T> splitOutput;
+
+        Collector<T> getOrCreateSplitOutput(Collector<T> output) {
+            if (splitOutput == null) {
+                splitOutput = output;
+            }
+            return splitOutput;
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
new file mode 100644
index 000000000..5df2efe55
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/SourceReaderOptions.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Getter;
+
+@Getter
+@SuppressWarnings("MagicNumber")
+public class SourceReaderOptions {
+    public static final Option<Long> SOURCE_READER_CLOSE_TIMEOUT =
+        Options.key("source.reader.close.timeout")
+            .longType()
+            .defaultValue(30000L)
+            .withDescription("The timeout when closing the source reader");
+
+    public static final Option<Integer> ELEMENT_QUEUE_CAPACITY =
+        Options.key("source.reader.element.queue.capacity")
+            .intType()
+            .defaultValue(2)
+            .withDescription("The capacity of the element queue in the source 
reader.");
+
+    public final long sourceReaderCloseTimeout;
+    public final int elementQueueCapacity;
+
+    public SourceReaderOptions(Config config) {
+        this(ReadonlyConfig.fromConfig(config));
+    }
+
+    public SourceReaderOptions(ReadonlyConfig config) {
+        this.sourceReaderCloseTimeout = 
config.get(SOURCE_READER_CLOSE_TIMEOUT);
+        this.elementQueueCapacity = config.get(ELEMENT_QUEUE_CAPACITY);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/AddSplitsTask.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/AddSplitsTask.java
new file mode 100644
index 000000000..41e527a36
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/AddSplitsTask.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsAddition;
+
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import java.util.Collection;
+import java.util.Map;
+
+@RequiredArgsConstructor
+@ToString(of = {"splitsToAdd"})
+class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask {
+    private final SplitReader<?, SplitT> splitReader;
+    private final Collection<SplitT> splitsToAdd;
+    private final Map<String, SplitT> assignedSplits;
+
+    @Override
+    public void run() {
+        for (SplitT s : splitsToAdd) {
+            assignedSplits.put(s.splitId(), s);
+        }
+        splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd));
+    }
+
+    @Override
+    public void wakeUp() {
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
new file mode 100644
index 000000000..42b71c927
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/FetchTask.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+@Slf4j
+@RequiredArgsConstructor
+class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
+    private static final int OFFER_TIMEOUT_MILLIS = 10000;
+
+    private final SplitReader<E, SplitT> splitReader;
+    private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+    private final Consumer<Collection<String>> splitFinishedCallback;
+    private final int fetcherIndex;
+
+    @Getter(value = AccessLevel.PRIVATE)
+    private volatile boolean wakeup;
+    private volatile RecordsWithSplitIds<E> lastRecords;
+
+    @Override
+    public void run() throws IOException {
+        try {
+            if (!isWakeup() && lastRecords == null) {
+                lastRecords = splitReader.fetch();
+                log.debug("Fetch records from split fetcher {}", fetcherIndex);
+            }
+
+            if (!isWakeup()) {
+                if (elementsQueue.offer(lastRecords, OFFER_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS)) {
+                    if (!lastRecords.finishedSplits().isEmpty()) {
+                        
splitFinishedCallback.accept(lastRecords.finishedSplits());
+                    }
+                    lastRecords = null;
+                    log.debug("Enqueued records from split fetcher {}", 
fetcherIndex);
+                } else {
+                    log.debug("Enqueuing timed out in split fetcher {}, queue 
is blocked", fetcherIndex);
+                }
+            }
+        } catch (InterruptedException e) {
+            // this should only happen on shutdown
+            throw new IOException("Source fetch execution was interrupted", e);
+        } finally {
+            // clean up the potential wakeup effect.
+            if (isWakeup()) {
+                wakeup = false;
+            }
+        }
+    }
+
+    @Override
+    public void wakeUp() {
+        // Set the wakeup flag first.
+        wakeup = true;
+
+        if (lastRecords == null) {
+            splitReader.wakeUp();
+        } else {
+            // interrupt enqueuing the records
+            // or waitting records offer into queue timeout, see {@link #run()}
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SingleThreadFetcherManager.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SingleThreadFetcherManager.java
new file mode 100644
index 000000000..137cbe79b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * A Fetcher Manager with a single fetching thread (I/O thread) that handles 
all splits concurrently.
+ *
+ * @param <E>
+ *
+ * @param <SplitT>
+ *
+ */
+public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
+        extends SplitFetcherManager<E, SplitT> {
+
+    public SingleThreadFetcherManager(BlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+                                      Supplier<SplitReader<E, SplitT>> 
splitReaderSupplier) {
+        super(elementsQueue, splitReaderSupplier);
+    }
+
+    public SingleThreadFetcherManager(BlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+                                      Supplier<SplitReader<E, SplitT>> 
splitReaderSupplier,
+                                      Consumer<Collection<String>> 
splitFinishedHook) {
+        super(elementsQueue, splitReaderSupplier, splitFinishedHook);
+    }
+
+    @Override
+    public void addSplits(Collection<SplitT> splitsToAdd) {
+        SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
+        if (fetcher == null) {
+            fetcher = createSplitFetcher();
+            fetcher.addSplits(splitsToAdd);
+
+            startFetcher(fetcher);
+        } else {
+            fetcher.addSplits(splitsToAdd);
+        }
+    }
+
+    protected SplitFetcher<E, SplitT> getRunningFetcher() {
+        return fetchers.isEmpty() ? null : fetchers.values().iterator().next();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcher.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcher.java
new file mode 100644
index 000000000..b48b4d5c6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcher.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+@Slf4j
+public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
+    @Getter
+    private final int fetcherId;
+    private final Deque<SplitFetcherTask> taskQueue = new ArrayDeque<>();
+    @Getter
+    private final Map<String, SplitT> assignedSplits = new HashMap<>();
+    @Getter
+    private final SplitReader<E, SplitT> splitReader;
+    private final Consumer<Throwable> errorHandler;
+    private final Runnable shutdownHook;
+    private final FetchTask fetchTask;
+
+    private volatile boolean closed;
+    private volatile SplitFetcherTask runningTask = null;
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition nonEmpty = lock.newCondition();
+
+    SplitFetcher(int fetcherId,
+                 @NonNull BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+                 @NonNull SplitReader<E, SplitT> splitReader,
+                 @NonNull Consumer<Throwable> errorHandler,
+                 @NonNull Runnable shutdownHook,
+                 @NonNull Consumer<Collection<String>> splitFinishedHook) {
+        this.fetcherId = fetcherId;
+        this.splitReader = splitReader;
+        this.errorHandler = errorHandler;
+        this.shutdownHook = shutdownHook;
+        this.fetchTask = new FetchTask<>(
+            splitReader,
+            elementsQueue,
+            finishedSplits -> {
+                finishedSplits.forEach(assignedSplits::remove);
+                splitFinishedHook.accept(finishedSplits);
+                log.info("Finished reading from splits {}", finishedSplits);
+            },
+            fetcherId);
+    }
+
+    @Override
+    public void run() {
+        log.info("Starting split fetcher {}", fetcherId);
+        try {
+            while (runOnce()) {
+                // nothing to do, everything is inside #runOnce.
+            }
+        } catch (Throwable t) {
+            errorHandler.accept(t);
+        } finally {
+            try {
+                splitReader.close();
+            } catch (Exception e) {
+                errorHandler.accept(e);
+            } finally {
+                log.info("Split fetcher {} exited.", fetcherId);
+                shutdownHook.run();
+            }
+        }
+    }
+
+    public void addSplits(@NonNull Collection<SplitT> splitsToAdd) {
+        lock.lock();
+        try {
+            addTaskUnsafe(new AddSplitsTask<>(splitReader, splitsToAdd, 
assignedSplits));
+            wakeUpUnsafe(true);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void addTask(@NonNull SplitFetcherTask task) {
+        lock.lock();
+        try {
+            addTaskUnsafe(task);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public void shutdown() {
+        lock.lock();
+        try {
+            if (!closed) {
+                closed = true;
+                log.info("Shutting down split fetcher {}", fetcherId);
+                wakeUpUnsafe(false);
+            }
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public boolean isIdle() {
+        lock.lock();
+        try {
+            return assignedSplits.isEmpty() && taskQueue.isEmpty() && 
runningTask == null;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private boolean runOnce() {
+        lock.lock();
+        SplitFetcherTask nextTask;
+        try {
+            if (closed) {
+                return false;
+            }
+
+            nextTask = getNextTaskUnsafe();
+            if (nextTask == null) {
+                // (spurious) wakeup, so just repeat
+                return true;
+            }
+
+            log.debug("Prepare to run {}", nextTask);
+            // store task for #wakeUp
+            this.runningTask = nextTask;
+        } finally {
+            lock.unlock();
+        }
+
+        // execute the task outside of lock, so that it can be woken up
+        try {
+            nextTask.run();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                String.format("SplitFetcher thread %d received unexpected 
exception while polling the records",
+                    fetcherId), e);
+        }
+
+        // re-acquire lock as all post-processing steps, need it
+        lock.lock();
+        try {
+            this.runningTask = null;
+        } finally {
+            lock.unlock();
+        }
+        return true;
+    }
+
+    private SplitFetcherTask getNextTaskUnsafe() {
+        assert lock.isHeldByCurrentThread();
+
+        try {
+            if (!taskQueue.isEmpty()) {
+                // execute tasks in taskQueue first
+                return taskQueue.poll();
+            } else if (!assignedSplits.isEmpty()) {
+                // use fallback task = fetch if there is at least one split
+                return fetchTask;
+            } else {
+                // nothing to do, wait for signal
+                nonEmpty.await();
+                return taskQueue.poll();
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("The thread was interrupted while 
waiting for a fetcher task.");
+        }
+    }
+
+    private void wakeUpUnsafe(boolean taskOnly) {
+        assert lock.isHeldByCurrentThread();
+
+        SplitFetcherTask currentTask = runningTask;
+        if (currentTask != null) {
+            log.debug("Waking up running task {}", currentTask);
+            currentTask.wakeUp();
+        } else if (!taskOnly) {
+            log.debug("Waking up fetcher thread.");
+            nonEmpty.signal();
+        }
+    }
+
+    private void addTaskUnsafe(SplitFetcherTask task) {
+        assert lock.isHeldByCurrentThread();
+
+        taskQueue.add(task);
+        nonEmpty.signal();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcherManager.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcherManager.java
new file mode 100644
index 000000000..203372774
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcherManager.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * The split fetcher manager could be used to support different threading 
models by implementing
+ * the {@link #addSplits(Collection)} method differently. For example, a 
single thread split fetcher
+ * manager would only start a single fetcher and assign all the splits to it. 
A one-thread-per-split
+ * fetcher may spawn a new thread every time a new split is assigned.
+ *
+ * @param <E>
+ *
+ * @param <SplitT>
+ *
+ */
+@Slf4j
+public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
+    protected final Map<Integer, SplitFetcher<E, SplitT>> fetchers;
+    private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+    private final Supplier<SplitReader<E, SplitT>> splitReaderFactory;
+    private final Consumer<Collection<String>> splitFinishedHook;
+    private final AtomicInteger fetcherIdGenerator;
+    private final AtomicReference<Throwable> uncaughtFetcherException;
+    private final Consumer<Throwable> errorHandler;
+    private final ExecutorService executors;
+    private volatile boolean closed;
+
+    public SplitFetcherManager(BlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+                               Supplier<SplitReader<E, SplitT>> 
splitReaderFactory) {
+        this(elementsQueue, splitReaderFactory, ignore -> {});
+    }
+
+    public SplitFetcherManager(BlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+                               Supplier<SplitReader<E, SplitT>> 
splitReaderFactory,
+                               Consumer<Collection<String>> splitFinishedHook) 
{
+        this.fetchers = new ConcurrentHashMap<>();
+        this.elementsQueue = elementsQueue;
+        this.splitReaderFactory = splitReaderFactory;
+        this.splitFinishedHook = splitFinishedHook;
+        this.fetcherIdGenerator = new AtomicInteger(0);
+        this.uncaughtFetcherException = new AtomicReference<>(null);
+        this.errorHandler = throwable -> {
+            log.error("Received uncaught exception.", throwable);
+            if (!uncaughtFetcherException.compareAndSet(null, throwable)) {
+                // Add the exception to the exception list.
+                uncaughtFetcherException.get().addSuppressed(throwable);
+            }
+        };
+        String taskThreadName = Thread.currentThread().getName();
+        this.executors = Executors.newCachedThreadPool(
+            r -> new Thread(r, "Source Data Fetcher for " + taskThreadName));
+    }
+
+    public abstract void addSplits(Collection<SplitT> splitsToAdd);
+
+    protected void startFetcher(SplitFetcher<E, SplitT> fetcher) {
+        executors.submit(fetcher);
+    }
+
+    protected synchronized SplitFetcher<E, SplitT> createSplitFetcher() {
+        if (closed) {
+            throw new IllegalStateException("The split fetcher manager has 
closed.");
+        }
+        // Create SplitReader.
+        SplitReader<E, SplitT> splitReader = splitReaderFactory.get();
+        int fetcherId = fetcherIdGenerator.getAndIncrement();
+        SplitFetcher<E, SplitT> splitFetcher =
+            new SplitFetcher<>(
+                fetcherId,
+                elementsQueue,
+                splitReader,
+                errorHandler,
+                () -> {
+                    fetchers.remove(fetcherId);
+                },
+                this.splitFinishedHook);
+        fetchers.put(fetcherId, splitFetcher);
+        return splitFetcher;
+    }
+
+    public synchronized boolean maybeShutdownFinishedFetchers() {
+        Iterator<Map.Entry<Integer, SplitFetcher<E, SplitT>>> iter = 
fetchers.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry<Integer, SplitFetcher<E, SplitT>> entry = iter.next();
+            SplitFetcher<E, SplitT> fetcher = entry.getValue();
+            if (fetcher.isIdle()) {
+                log.info("Closing splitFetcher {} because it is idle.", 
entry.getKey());
+                fetcher.shutdown();
+                iter.remove();
+            }
+        }
+        return fetchers.isEmpty();
+    }
+
+    public synchronized void close(long timeoutMs) throws Exception {
+        closed = true;
+        fetchers.values().forEach(SplitFetcher::shutdown);
+        executors.shutdown();
+        if (!executors.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
+            log.warn("Failed to close the source reader in {} ms. There are 
still {} split fetchers running",
+                timeoutMs,
+                fetchers.size());
+        }
+    }
+
+    public void checkErrors() {
+        if (uncaughtFetcherException.get() != null) {
+            throw new RuntimeException("One or more fetchers have encountered 
exception",
+                uncaughtFetcherException.get());
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcherTask.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcherTask.java
new file mode 100644
index 000000000..d36f976de
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/fetcher/SplitFetcherTask.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher;
+
+import java.io.IOException;
+
+public interface SplitFetcherTask {
+
+    /**
+     * Run the logic. This method allows throwing an interrupted exception on 
wakeup, but the
+     * implementation does not have to.
+     */
+    void run() throws IOException;
+
+    /**
+     * Wake up the running thread.
+     */
+    void wakeUp();
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitReader.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitReader.java
new file mode 100644
index 000000000..d5ec80754
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitReader.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+
+import java.io.IOException;
+
+/**
+ *
+ * An interface used to read from splits.
+ *
+ */
+public interface SplitReader<E, SplitT extends SourceSplit> {
+
+    /**
+     * Fetch elements into the blocking queue for the given splits. The fetch 
call could be blocking
+     * but it should get unblocked when {@link #wakeUp()} is invoked. In that 
case, the
+     * implementation may either decide to return without throwing an 
exception, or it can just
+     * throw an interrupted exception. In either case, this method should be 
reentrant, meaning that
+     * the next fetch call should just resume from where the last fetch call 
was waken up or
+     * interrupted.
+     *
+     */
+    RecordsWithSplitIds<E> fetch() throws IOException;
+
+    /**
+     * Handle the split changes. This call should be non-blocking.
+     *
+     * @param splitsChanges
+     *
+     */
+    void handleSplitsChanges(SplitsChange<SplitT> splitsChanges);
+
+    /**
+     * Wake up the split reader in case the fetcher thread is blocking in 
{@link #fetch()}.
+     */
+    void wakeUp();
+
+    /**
+     * Close the split reader.
+     *
+     * @throws Exception
+     *
+     */
+    void close() throws Exception;
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitsAddition.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitsAddition.java
new file mode 100644
index 000000000..79d365330
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitsAddition.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader;
+
+import java.util.Collection;
+
+public class SplitsAddition<SplitT> extends SplitsChange<SplitT> {
+
+    public SplitsAddition(Collection<SplitT> splits) {
+        super(splits);
+    }
+
+    public String toString() {
+        return String.format("SplitAddition:[%s]", splits());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitsChange.java
 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitsChange.java
new file mode 100644
index 000000000..1b21a9a2d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/splitreader/SplitsChange.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader;
+
+import lombok.AllArgsConstructor;
+
+import java.util.Collection;
+import java.util.Collections;
+
+@AllArgsConstructor
+public abstract class SplitsChange<SplitT> {
+    private final Collection<SplitT> splits;
+
+    public Collection<SplitT> splits() {
+        return Collections.unmodifiableCollection(splits);
+    }
+}
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/InternalRowCollector.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/InternalRowCollector.java
index 1a02c3a43..892873feb 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/InternalRowCollector.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/InternalRowCollector.java
@@ -25,15 +25,19 @@ import 
org.apache.seatunnel.translation.spark.common.serialization.InternalRowCo
 
 import org.apache.spark.sql.catalyst.InternalRow;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 public class InternalRowCollector implements Collector<SeaTunnelRow> {
     private final Handover<InternalRow> handover;
     private final Object checkpointLock;
     private final InternalRowConverter rowSerialization;
+    private final AtomicLong collectTotalCount;
 
     public InternalRowCollector(Handover<InternalRow> handover, Object 
checkpointLock, SeaTunnelDataType<?> dataType) {
         this.handover = handover;
         this.checkpointLock = checkpointLock;
         this.rowSerialization = new InternalRowConverter(dataType);
+        this.collectTotalCount = new AtomicLong(0);
     }
 
     @Override
@@ -42,11 +46,16 @@ public class InternalRowCollector implements 
Collector<SeaTunnelRow> {
             synchronized (checkpointLock) {
                 handover.produce(rowSerialization.convert(record));
             }
+            collectTotalCount.incrementAndGet();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
+    public long collectTotalCount() {
+        return collectTotalCount.get();
+    }
+
     @Override
     public Object getCheckpointLock() {
         return this.checkpointLock;
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
index 81c491e74..6fa2aa94a 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
@@ -53,6 +53,7 @@ public class ParallelBatchPartitionReader {
     protected volatile boolean prepare = true;
 
     protected volatile BaseSourceFunction<SeaTunnelRow> internalSource;
+    protected volatile InternalRowCollector internalRowCollector;
 
     public ParallelBatchPartitionReader(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source, Integer parallelism, Integer subtaskId) {
         this.source = source;
@@ -90,9 +91,11 @@ public class ParallelBatchPartitionReader {
             running = false;
             throw new RuntimeException("Failed to open internal source.", e);
         }
+
+        this.internalRowCollector = new InternalRowCollector(handover, 
checkpointLock, source.getProducedType());
         executorService.execute(() -> {
             try {
-                internalSource.run(new InternalRowCollector(handover, 
checkpointLock, source.getProducedType()));
+                internalSource.run(internalRowCollector);
             } catch (Exception e) {
                 handover.reportError(e);
                 log.error("BatchPartitionReader execute failed.", e);
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/CoordinatedMicroBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/CoordinatedMicroBatchPartitionReader.java
index 83ca10cca..9aa326844 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/CoordinatedMicroBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/CoordinatedMicroBatchPartitionReader.java
@@ -55,7 +55,20 @@ public class CoordinatedMicroBatchPartitionReader extends 
ParallelMicroBatchPart
     @Override
     public void virtualCheckpoint() {
         try {
-            internalCheckpoint(collectorMap.values().iterator(), 0);
+            int checkpointRetries = Math.max(1, CHECKPOINT_RETRIES);
+            do {
+                checkpointRetries--;
+                long collectedReader = 
collectorMap.values().stream().mapToLong(e -> e.collectTotalCount() > 0 ? 1 : 
0).sum();
+                if (collectedReader == 0) {
+                    Thread.sleep(CHECKPOINT_SLEEP_INTERVAL);
+                }
+
+                collectedReader = collectorMap.values().stream().mapToLong(e 
-> e.collectTotalCount() > 0 ? 1 : 0).sum();
+                if (collectedReader != 0 || checkpointRetries == 0) {
+                    checkpointRetries = 0;
+                    internalCheckpoint(collectorMap.values().iterator(), 0);
+                }
+            } while (checkpointRetries > 0);
         } catch (Exception e) {
             throw new RuntimeException("An error occurred in virtual 
checkpoint execution.", e);
         }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/ParallelMicroBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/ParallelMicroBatchPartitionReader.java
index 30c5c3402..35a6e09a7 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/ParallelMicroBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/micro/ParallelMicroBatchPartitionReader.java
@@ -44,6 +44,7 @@ import java.util.concurrent.TimeUnit;
 
 public class ParallelMicroBatchPartitionReader extends 
ParallelBatchPartitionReader {
     protected static final Integer CHECKPOINT_SLEEP_INTERVAL = 10;
+    protected static final Integer CHECKPOINT_RETRIES = 3;
     protected volatile Integer checkpointId;
     protected final Integer checkpointInterval;
     protected final String checkpointPath;
@@ -117,19 +118,30 @@ public class ParallelMicroBatchPartitionReader extends 
ParallelBatchPartitionRea
 
     public void virtualCheckpoint() {
         try {
-            synchronized (checkpointLock) {
-                while (!handover.isEmpty()) {
+            int checkpointRetries = Math.max(1, CHECKPOINT_RETRIES);
+            do {
+                checkpointRetries--;
+                if (internalRowCollector.collectTotalCount() == 0) {
                     Thread.sleep(CHECKPOINT_SLEEP_INTERVAL);
                 }
-                // Block #next() method
-                synchronized (handover) {
-                    final int currentCheckpoint = checkpointId;
-                    ReaderState readerState = snapshotState();
-                    saveState(readerState, currentCheckpoint);
-                    internalSource.notifyCheckpointComplete(currentCheckpoint);
-                    running = false;
+                synchronized (checkpointLock) {
+                    if (internalRowCollector.collectTotalCount() != 0 || 
checkpointRetries == 0) {
+                        checkpointRetries = 0;
+
+                        while (!handover.isEmpty()) {
+                            Thread.sleep(CHECKPOINT_SLEEP_INTERVAL);
+                        }
+                        // Block #next() method
+                        synchronized (handover) {
+                            final int currentCheckpoint = checkpointId;
+                            ReaderState readerState = snapshotState();
+                            saveState(readerState, currentCheckpoint);
+                            
internalSource.notifyCheckpointComplete(currentCheckpoint);
+                            running = false;
+                        }
+                    }
                 }
-            }
+            } while (checkpointRetries > 0);
         } catch (Exception e) {
             throw new RuntimeException("An error occurred in virtual 
checkpoint execution.", e);
         }

Reply via email to