This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9212a77140 [Improve][API] Optimize the enumerator API semantics and 
reduce lock calls at the connector level (#9671)
9212a77140 is described below

commit 9212a77140bb9a310237b4ef82126d42b5d767e6
Author: Jia Fan <[email protected]>
AuthorDate: Fri Aug 15 10:38:06 2025 +0800

    [Improve][API] Optimize the enumerator API semantics and reduce lock calls 
at the connector level (#9671)
---
 .../api/source/SourceSplitEnumerator.java          | 23 +++++-
 .../enumerator/TiDBSourceSplitEnumerator.java      |  4 +-
 .../split/ClickhouseSourceSplitEnumerator.java     | 18 ++---
 .../source/split/DorisSourceSplitEnumerator.java   | 22 +++---
 .../source/split/FileSourceSplitEnumerator.java    |  9 ++-
 .../MultipleTableFileSourceSplitEnumerator.java    |  9 ++-
 .../MultipleTableHiveSourceSplitEnumerator.java    |  9 ++-
 .../source/enumerator/AbstractSplitEnumerator.java | 22 +++---
 .../jdbc/source/JdbcSourceSplitEnumerator.java     | 22 +++---
 .../kafka/source/KafkaSourceSplitEnumerator.java   | 16 ++--
 .../kudu/source/KuduSourceSplitEnumerator.java     | 24 +++---
 .../source/MaxcomputeSourceSplitEnumerator.java    | 13 +++-
 .../milvus/source/MilvusSourceSplitEnumerator.java | 22 +++---
 .../source/enumerator/MongodbSplitEnumerator.java  | 26 ++++---
 .../source/enumerator/AbstractSplitEnumerator.java |  9 ++-
 .../PaimonBatchSourceSplitEnumerator.java          |  8 +-
 .../source/enumerator/PulsarSplitEnumerator.java   | 13 +++-
 .../source/RocketMqSourceSplitEnumerator.java      | 27 +++++--
 .../sls/source/SlsSourceSplitEnumerator.java       | 16 ++--
 .../source/TypesenseSourceSplitEnumerator.java     |  3 -
 .../flink/multitable/MultiTableSinkTest.java       |  7 +-
 .../config/inmemory_to_inmemory_multi_table.conf}  | 11 +--
 .../spark/multitable/MultiTableSinkTest.java       |  7 +-
 ....conf => inmemory_to_inmemory_multi_table.conf} | 10 +--
 .../seatunnel/multitable/MultiTableSinkTest.java   |  7 +-
 .../config/inmemory_to_inmemory_multi_table.conf}  | 10 +--
 .../e2e/source/inmemory/InMemorySource.java        | 82 ++++++++++++++++++++
 .../e2e/source/inmemory/InMemorySourceFactory.java | 54 +++++++++++++
 .../e2e/source/inmemory/InMemorySourceReader.java  | 82 ++++++++++++++++++++
 .../e2e/source/inmemory/InMemorySourceSplit.java   | 39 ++++++++++
 .../inmemory/InMemorySourceSplitEnumerator.java    | 88 ++++++++++++++++++++++
 .../e2e/source/inmemory/InMemoryState.java         | 22 ++++++
 .../server/task/SourceSplitEnumeratorTask.java     |  8 +-
 .../translation/source/CoordinatedSource.java      | 26 +++----
 .../flink/source/FlinkSourceEnumerator.java        | 30 ++++----
 35 files changed, 609 insertions(+), 189 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
index 897d18ac5f..d1488d00db 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -38,7 +38,20 @@ public interface SourceSplitEnumerator<SplitT extends 
SourceSplit, StateT>
 
     void open();
 
-    /** The method is executed by the engine only once. */
+    /**
+     * Executes engine setup steps in a fixed, non‑concurrent sequence.
+     *
+     * <p>Before the first {@link #run()} invocation, methods are called in 
this order:
+     *
+     * <ol>
+     *   <li>{@link #open()}
+     *   <li>{@link #addSplitsBack(List, int)}
+     *   <li>{@link #registerReader(int)}
+     * </ol>
+     *
+     * <p>{@implNote The engine guarantees this invocation order and ensures 
there are no
+     * concurrency issues between these calls.}
+     */
     void run() throws Exception;
 
     /**
@@ -63,7 +76,13 @@ public interface SourceSplitEnumerator<SplitT extends 
SourceSplit, StateT>
 
     void registerReader(int subtaskId);
 
-    /** If the source is bounded, checkpoint is not triggered. */
+    /**
+     * Used to snapshot the state of the enumerator.
+     *
+     * <p><strong>Concurrency Consideration:</strong><br>
+     * This method and {@link #run()} can be invoked concurrently by different 
threads.
+     * Systematically manage shared state access to prevent race conditions.
+     */
     StateT snapshotState(long checkpointId) throws Exception;
 
     /**
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java
index 466d056bc1..fc4b7dd332 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java
@@ -211,7 +211,9 @@ public class TiDBSourceSplitEnumerator
      */
     @Override
     public TiDBSourceCheckpointState snapshotState(long checkpointId) throws 
Exception {
-        return new TiDBSourceCheckpointState(shouldEnumerate, pendingSplit);
+        synchronized (stateLock) {
+            return new TiDBSourceCheckpointState(shouldEnumerate, 
pendingSplit);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplitEnumerator.java
index 451d814e02..be90e84243 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplitEnumerator.java
@@ -121,16 +121,14 @@ public class ClickhouseSourceSplitEnumerator
     @Override
     public void addSplitsBack(List<ClickhouseSourceSplit> splits, int 
subtaskId) {
         if (!splits.isEmpty()) {
-            synchronized (stateLock) {
-                addPendingSplit(splits, subtaskId);
-                if (context.registeredReaders().contains(subtaskId)) {
-                    assignSplit(Collections.singletonList(subtaskId));
-                } else {
-                    LOG.warn(
-                            "Reader {} is not registered. Pending splits {} 
are not assigned.",
-                            subtaskId,
-                            splits);
-                }
+            addPendingSplit(splits, subtaskId);
+            if (context.registeredReaders().contains(subtaskId)) {
+                assignSplit(Collections.singletonList(subtaskId));
+            } else {
+                LOG.warn(
+                        "Reader {} is not registered. Pending splits {} are 
not assigned.",
+                        subtaskId,
+                        splits);
             }
         }
         LOG.info("Add back splits {} to JdbcSourceSplitEnumerator.", 
splits.size());
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java
index af18ac5629..c2f7cdf007 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java
@@ -107,16 +107,14 @@ public class DorisSourceSplitEnumerator
     public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) {
         log.debug("Add back splits {} to DorisSourceSplitEnumerator.", splits);
         if (!splits.isEmpty()) {
-            synchronized (stateLock) {
-                addPendingSplit(splits);
-                if (context.registeredReaders().contains(subtaskId)) {
-                    assignSplit(Collections.singletonList(subtaskId));
-                } else {
-                    log.warn(
-                            "Reader {} is not registered. Pending splits {} 
are not assigned.",
-                            subtaskId,
-                            splits);
-                }
+            addPendingSplit(splits);
+            if (context.registeredReaders().contains(subtaskId)) {
+                assignSplit(Collections.singletonList(subtaskId));
+            } else {
+                log.warn(
+                        "Reader {} is not registered. Pending splits {} are 
not assigned.",
+                        subtaskId,
+                        splits);
             }
         }
     }
@@ -137,9 +135,7 @@ public class DorisSourceSplitEnumerator
     public void registerReader(int subtaskId) {
         log.debug("Register reader {} to DorisSourceSplitEnumerator.", 
subtaskId);
         if (!pendingSplit.isEmpty()) {
-            synchronized (stateLock) {
-                assignSplit(Collections.singletonList(subtaskId));
-            }
+            assignSplit(Collections.singletonList(subtaskId));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
index 544c759e21..dadb7066cf 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
@@ -43,6 +43,7 @@ public class FileSourceSplitEnumerator
             new TreeSet<>(Comparator.comparing(FileSourceSplit::splitId));
     private Set<FileSourceSplit> assignedSplit;
     private final List<String> filePaths;
+    private final Object lock = new Object();
     private final AtomicInteger assignCount = new AtomicInteger(0);
 
     public FileSourceSplitEnumerator(
@@ -69,7 +70,9 @@ public class FileSourceSplitEnumerator
     public void run() {
         for (int i = 0; i < context.currentParallelism(); i++) {
             LOGGER.info("Assigned splits to reader [{}]", i);
-            assignSplit(i);
+            synchronized (lock) {
+                assignSplit(i);
+            }
         }
     }
 
@@ -139,7 +142,9 @@ public class FileSourceSplitEnumerator
 
     @Override
     public FileSourceState snapshotState(long checkpointId) {
-        return new FileSourceState(assignedSplit);
+        synchronized (lock) {
+            return new FileSourceState(assignedSplit);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
index 4f5938b85a..bfa6024479 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
@@ -46,6 +46,7 @@ public class MultipleTableFileSourceSplitEnumerator
     private final Set<FileSourceSplit> assignedSplit;
     private final Map<String, List<String>> filePathMap;
     private final AtomicInteger assignCount = new AtomicInteger(0);
+    private final Object lock = new Object();
 
     public MultipleTableFileSourceSplitEnumerator(
             Context<FileSourceSplit> context,
@@ -107,7 +108,9 @@ public class MultipleTableFileSourceSplitEnumerator
 
     @Override
     public FileSourceState snapshotState(long checkpointId) {
-        return new FileSourceState(assignedSplit);
+        synchronized (lock) {
+            return new FileSourceState(assignedSplit);
+        }
     }
 
     @Override
@@ -155,7 +158,9 @@ public class MultipleTableFileSourceSplitEnumerator
     public void run() throws Exception {
         for (int i = 0; i < context.currentParallelism(); i++) {
             log.info("Assigned splits to reader [{}]", i);
-            assignSplit(i);
+            synchronized (lock) {
+                assignSplit(i);
+            }
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
index d927f83436..96a8779e26 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
@@ -47,6 +47,7 @@ public class MultipleTableHiveSourceSplitEnumerator
     private final Set<FileSourceSplit> assignedSplit;
     private final Map<String, List<String>> filePathMap;
     private final AtomicInteger assignCount = new AtomicInteger(0);
+    private final Object lock = new Object();
 
     public MultipleTableHiveSourceSplitEnumerator(
             SourceSplitEnumerator.Context<FileSourceSplit> context,
@@ -108,7 +109,9 @@ public class MultipleTableHiveSourceSplitEnumerator
 
     @Override
     public FileSourceState snapshotState(long checkpointId) {
-        return new FileSourceState(assignedSplit);
+        synchronized (lock) {
+            return new FileSourceState(assignedSplit);
+        }
     }
 
     @Override
@@ -156,7 +159,9 @@ public class MultipleTableHiveSourceSplitEnumerator
     public void run() throws Exception {
         for (int i = 0; i < context.currentParallelism(); i++) {
             log.info("Assigned splits to reader [{}]", i);
-            assignSplit(i);
+            synchronized (lock) {
+                assignSplit(i);
+            }
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
index 4dff9e67c9..39c874a63b 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
@@ -131,16 +131,14 @@ public abstract class AbstractSplitEnumerator
     @Override
     public void addSplitsBack(List<IcebergFileScanTaskSplit> splits, int 
subtaskId) {
         if (!splits.isEmpty()) {
-            synchronized (stateLock) {
-                addPendingSplits(splits);
-                if (context.registeredReaders().contains(subtaskId)) {
-                    assignPendingSplits(Collections.singleton(subtaskId));
-                } else {
-                    log.warn(
-                            "Reader {} is not registered. Pending splits {} 
are not assigned.",
-                            subtaskId,
-                            splits);
-                }
+            addPendingSplits(splits);
+            if (context.registeredReaders().contains(subtaskId)) {
+                assignPendingSplits(Collections.singleton(subtaskId));
+            } else {
+                log.warn(
+                        "Reader {} is not registered. Pending splits {} are 
not assigned.",
+                        subtaskId,
+                        splits);
             }
         }
         log.info("Add back splits {} to JdbcSourceSplitEnumerator.", 
splits.size());
@@ -163,9 +161,7 @@ public abstract class AbstractSplitEnumerator
     @Override
     public void registerReader(int subtaskId) {
         log.debug("Adding reader {} to IcebergSourceEnumerator.", subtaskId);
-        synchronized (stateLock) {
-            assignPendingSplits(Collections.singleton(subtaskId));
-        }
+        assignPendingSplits(Collections.singleton(subtaskId));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
index 303e84678d..a0948dfe0d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
@@ -103,16 +103,14 @@ public class JdbcSourceSplitEnumerator
     @Override
     public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
-            synchronized (stateLock) {
-                addPendingSplit(splits, subtaskId);
-                if (context.registeredReaders().contains(subtaskId)) {
-                    assignSplit(Collections.singletonList(subtaskId));
-                } else {
-                    LOG.warn(
-                            "Reader {} is not registered. Pending splits {} 
are not assigned.",
-                            subtaskId,
-                            splits);
-                }
+            addPendingSplit(splits, subtaskId);
+            if (context.registeredReaders().contains(subtaskId)) {
+                assignSplit(Collections.singletonList(subtaskId));
+            } else {
+                LOG.warn(
+                        "Reader {} is not registered. Pending splits {} are 
not assigned.",
+                        subtaskId,
+                        splits);
             }
         }
         LOG.info("Add back splits {} to JdbcSourceSplitEnumerator.", 
splits.size());
@@ -134,9 +132,7 @@ public class JdbcSourceSplitEnumerator
     public void registerReader(int subtaskId) {
         LOG.info("Register reader {} to JdbcSourceSplitEnumerator.", 
subtaskId);
         if (!pendingSplits.isEmpty()) {
-            synchronized (stateLock) {
-                assignSplit(Collections.singletonList(subtaskId));
-            }
+            assignSplit(Collections.singletonList(subtaskId));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 024f6416a8..26c61ddb2e 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -69,7 +69,7 @@ public class KafkaSourceSplitEnumerator
     private ScheduledExecutorService executor;
     private ScheduledFuture<?> scheduledFuture;
     private volatile boolean initialized;
-
+    private final Object lock = new Object();
     private final Map<String, TablePath> topicMappingTablePathMap = new 
HashMap<>();
 
     private boolean isStreamingMode;
@@ -145,9 +145,13 @@ public class KafkaSourceSplitEnumerator
 
     @Override
     public void run() throws ExecutionException, InterruptedException {
-        fetchPendingPartitionSplit();
-        setPartitionStartOffset();
-        assignSplit();
+        synchronized (lock) {
+            fetchPendingPartitionSplit();
+            setPartitionStartOffset();
+        }
+        synchronized (lock) {
+            assignSplit();
+        }
         if (!initialized) {
             initialized = true;
         }
@@ -288,7 +292,9 @@ public class KafkaSourceSplitEnumerator
 
     @Override
     public KafkaSourceState snapshotState(long checkpointId) throws Exception {
-        return new KafkaSourceState(new HashSet<>(assignedSplit.values()));
+        synchronized (lock) {
+            return new KafkaSourceState(new HashSet<>(assignedSplit.values()));
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java
index ee6574c274..469332da1c 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java
@@ -124,16 +124,14 @@ public class KuduSourceSplitEnumerator
     @Override
     public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
-            synchronized (stateLock) {
-                addPendingSplit(splits, subtaskId);
-                if (enumeratorContext.registeredReaders().contains(subtaskId)) 
{
-                    assignSplit(Collections.singletonList(subtaskId));
-                } else {
-                    log.warn(
-                            "Reader {} is not registered. Pending splits {} 
are not assigned.",
-                            subtaskId,
-                            splits);
-                }
+            addPendingSplit(splits, subtaskId);
+            if (enumeratorContext.registeredReaders().contains(subtaskId)) {
+                assignSplit(Collections.singletonList(subtaskId));
+            } else {
+                log.warn(
+                        "Reader {} is not registered. Pending splits {} are 
not assigned.",
+                        subtaskId,
+                        splits);
             }
         }
         log.info("Add back splits {} to JdbcSourceSplitEnumerator.", 
splits.size());
@@ -192,10 +190,8 @@ public class KuduSourceSplitEnumerator
     @Override
     public void registerReader(int subtaskId) {
         log.debug("Register reader {} to KuduSourceSplitEnumerator.", 
subtaskId);
-        synchronized (stateLock) {
-            if (!pendingSplits.isEmpty()) {
-                assignSplit(Collections.singletonList(subtaskId));
-            }
+        if (!pendingSplits.isEmpty()) {
+            assignSplit(Collections.singletonList(subtaskId));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
index f44f27cb0c..e6685f0f85 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
@@ -44,6 +44,7 @@ public class MaxcomputeSourceSplitEnumerator
     private Set<MaxcomputeSourceSplit> assignedSplits;
     private final ReadonlyConfig readonlyConfig;
     private final Map<TablePath, SourceTableInfo> sourceTableInfos;
+    private final Object stateLock = new Object();
 
     public MaxcomputeSourceSplitEnumerator(
             SourceSplitEnumerator.Context<MaxcomputeSourceSplit> 
enumeratorContext,
@@ -70,8 +71,12 @@ public class MaxcomputeSourceSplitEnumerator
 
     @Override
     public void run() throws Exception {
-        discoverySplits();
-        assignPendingSplits();
+        synchronized (stateLock) {
+            discoverySplits();
+        }
+        synchronized (stateLock) {
+            assignPendingSplits();
+        }
     }
 
     @Override
@@ -92,7 +97,9 @@ public class MaxcomputeSourceSplitEnumerator
 
     @Override
     public MaxcomputeSourceState snapshotState(long checkpointId) {
-        return new MaxcomputeSourceState(assignedSplits);
+        synchronized (stateLock) {
+            return new MaxcomputeSourceState(assignedSplits);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
index 1308a73bef..b1c242d682 100644
--- 
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
@@ -200,16 +200,14 @@ public class MilvusSourceSplitEnumerator
     @Override
     public void addSplitsBack(List<MilvusSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
-            synchronized (stateLock) {
-                addPendingSplit(splits, subtaskId);
-                if (context.registeredReaders().contains(subtaskId)) {
-                    assignSplit(Collections.singletonList(subtaskId));
-                } else {
-                    log.warn(
-                            "Reader {} is not registered. Pending splits {} 
are not assigned.",
-                            subtaskId,
-                            splits);
-                }
+            addPendingSplit(splits, subtaskId);
+            if (context.registeredReaders().contains(subtaskId)) {
+                assignSplit(Collections.singletonList(subtaskId));
+            } else {
+                log.warn(
+                        "Reader {} is not registered. Pending splits {} are 
not assigned.",
+                        subtaskId,
+                        splits);
             }
         }
         log.info("Add back splits {} to JdbcSourceSplitEnumerator.", 
splits.size());
@@ -235,9 +233,7 @@ public class MilvusSourceSplitEnumerator
     public void registerReader(int subtaskId) {
         log.info("Register reader {} to MilvusSourceSplitEnumerator.", 
subtaskId);
         if (!pendingSplits.isEmpty()) {
-            synchronized (stateLock) {
-                assignSplit(Collections.singletonList(subtaskId));
-            }
+            assignSplit(Collections.singletonList(subtaskId));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/enumerator/MongodbSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/enumerator/MongodbSplitEnumerator.java
index 1aa30b2c16..423092d466 100644
--- 
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/enumerator/MongodbSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/enumerator/MongodbSplitEnumerator.java
@@ -47,7 +47,7 @@ public class MongodbSplitEnumerator
     private final Context<MongoSplit> context;
 
     private final MongodbClientProvider clientProvider;
-
+    private final Object stateLock = new Object();
     private final MongoSplitStrategy strategy;
 
     public MongodbSplitEnumerator(
@@ -74,14 +74,18 @@ public class MongodbSplitEnumerator
     @Override
     public synchronized void run() {
         log.info("Starting MongoSplitEnumerator.");
-        Set<Integer> readers = context.registeredReaders();
-        pendingSplits.addAll(strategy.split());
-        MongoNamespace namespace = 
clientProvider.getDefaultCollection().getNamespace();
-        log.info(
-                "Added {} pending splits for namespace {}.",
-                pendingSplits.size(),
-                namespace.getFullName());
-        assignSplits(readers);
+        synchronized (stateLock) {
+            pendingSplits.addAll(strategy.split());
+            MongoNamespace namespace = 
clientProvider.getDefaultCollection().getNamespace();
+            log.info(
+                    "Added {} pending splits for namespace {}.",
+                    pendingSplits.size(),
+                    namespace.getFullName());
+        }
+        synchronized (stateLock) {
+            Set<Integer> readers = context.registeredReaders();
+            assignSplits(readers);
+        }
     }
 
     @Override
@@ -121,7 +125,9 @@ public class MongodbSplitEnumerator
 
     @Override
     public ArrayList<MongoSplit> snapshotState(long checkpointId) {
-        return pendingSplits;
+        synchronized (stateLock) {
+            return pendingSplits;
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
index 887769488b..2d748ad257 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
@@ -63,6 +63,7 @@ public abstract class AbstractSplitEnumerator
     protected Deque<PaimonSourceSplit> pendingSplits;
 
     protected final TableScan tableScan;
+    protected final Object stateLock = new Object();
 
     private final int splitMaxNum;
 
@@ -98,7 +99,9 @@ public abstract class AbstractSplitEnumerator
 
     @Override
     public void run() throws Exception {
-        loadNewSplits();
+        synchronized (stateLock) {
+            loadNewSplits();
+        }
     }
 
     @Override
@@ -129,7 +132,9 @@ public abstract class AbstractSplitEnumerator
 
     @Override
     public PaimonSourceState snapshotState(long checkpointId) throws Exception 
{
-        return new PaimonSourceState(pendingSplits, nextSnapshotId);
+        synchronized (stateLock) {
+            return new PaimonSourceState(pendingSplits, nextSnapshotId);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
index b00b38587a..46ecf42756 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
@@ -44,7 +44,9 @@ public class PaimonBatchSourceSplitEnumerator extends 
AbstractSplitEnumerator {
 
     @Override
     public void run() throws Exception {
-        this.processDiscoveredSplits(this.scanNextSnapshot(), null);
+        synchronized (stateLock) {
+            this.processDiscoveredSplits(this.scanNextSnapshot(), null);
+        }
         Set<Integer> readers = context.registeredReaders();
         log.debug(
                 "No more splits to assign." + " Sending NoMoreSplitsEvent to 
reader {}.", readers);
@@ -53,7 +55,9 @@ public class PaimonBatchSourceSplitEnumerator extends 
AbstractSplitEnumerator {
 
     @Override
     public PaimonSourceState snapshotState(long checkpointId) throws Exception 
{
-        return new PaimonSourceState(pendingSplits, null);
+        synchronized (stateLock) {
+            return new PaimonSourceState(pendingSplits, null);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
index e5fcef3e7a..9986ae4272 100644
--- 
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
@@ -61,6 +61,7 @@ public class PulsarSplitEnumerator
     private final long partitionDiscoveryIntervalMs;
     private final StartCursor startCursor;
     private final StopCursor stopCursor;
+    private final Object stateLock = new Object();
 
     /** The consumer group id used for this PulsarSource. */
     private final String subscriptionName;
@@ -152,9 +153,11 @@ public class PulsarSplitEnumerator
     }
 
     private void discoverySplits() {
-        Set<TopicPartition> subscribedTopicPartitions =
-                partitionDiscoverer.getSubscribedTopicPartitions(pulsarAdmin);
-        checkPartitionChanges(subscribedTopicPartitions);
+        synchronized (stateLock) {
+            Set<TopicPartition> subscribedTopicPartitions =
+                    
partitionDiscoverer.getSubscribedTopicPartitions(pulsarAdmin);
+            checkPartitionChanges(subscribedTopicPartitions);
+        }
     }
 
     private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions) {
@@ -299,7 +302,9 @@ public class PulsarSplitEnumerator
 
     @Override
     public PulsarSplitEnumeratorState snapshotState(long checkpointId) throws 
Exception {
-        return new PulsarSplitEnumeratorState(assignedPartitions);
+        synchronized (stateLock) {
+            return new PulsarSplitEnumeratorState(assignedPartitions);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
index 45ded447f2..18d60dbba9 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
@@ -58,6 +58,7 @@ public class RocketMqSourceSplitEnumerator
     private final Map<MessageQueue, RocketMqSourceSplit> pendingSplit;
     private ScheduledExecutorService executor;
     private ScheduledFuture scheduledFuture;
+    private final Object lock = new Object();
     // ms
     private long discoveryIntervalMillis;
 
@@ -118,9 +119,14 @@ public class RocketMqSourceSplitEnumerator
 
     @Override
     public void run() throws Exception {
-        fetchPendingPartitionSplit();
-        setPartitionStartOffset();
-        assignSplit();
+        synchronized (lock) {
+            fetchPendingPartitionSplit();
+            setPartitionStartOffset();
+        }
+
+        synchronized (lock) {
+            assignSplit();
+        }
     }
 
     @Override
@@ -159,7 +165,8 @@ public class RocketMqSourceSplitEnumerator
                         
split.setEndOffset(listOffsets.get(split.getMessageQueue()));
                     });
             return splits.stream()
-                    .collect(Collectors.toMap(split -> 
split.getMessageQueue(), split -> split));
+                    .collect(
+                            
Collectors.toMap(RocketMqSourceSplit::getMessageQueue, split -> split));
         } catch (Exception e) {
             throw new RocketMqConnectorException(
                     
RocketMqConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e);
@@ -185,7 +192,9 @@ public class RocketMqSourceSplitEnumerator
 
     @Override
     public RocketMqSourceState snapshotState(long checkpointId) throws 
Exception {
-        return new 
RocketMqSourceState(assignedSplit.values().stream().collect(Collectors.toSet()));
+        synchronized (lock) {
+            return new RocketMqSourceState(new 
HashSet<>(assignedSplit.values()));
+        }
     }
 
     @Override
@@ -194,8 +203,12 @@ public class RocketMqSourceSplitEnumerator
     }
 
     private void discoverySplits() {
-        fetchPendingPartitionSplit();
-        assignSplit();
+        synchronized (lock) {
+            fetchPendingPartitionSplit();
+        }
+        synchronized (lock) {
+            assignSplit();
+        }
     }
 
     private void fetchPendingPartitionSplit() {
diff --git 
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java
index f178d441a3..5a872eb7f3 100644
--- 
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java
@@ -56,6 +56,7 @@ public class SlsSourceSplitEnumerator
     private final Map<Integer, SlsSourceSplit> pendingSplit;
     private final Map<Integer, SlsSourceSplit> assignedSplit;
 
+    private final Object lock = new Object();
     private SlsSourceState slsSourceState;
 
     private ScheduledExecutorService executor;
@@ -124,8 +125,7 @@ public class SlsSourceSplitEnumerator
 
     @Override
     public void run() throws Exception {
-        fetchPendingShardSplit();
-        assignSplit();
+        discoverySplits();
     }
 
     @Override
@@ -157,8 +157,12 @@ public class SlsSourceSplitEnumerator
     public void notifyCheckpointComplete(long checkpointId) throws Exception {}
 
     private void discoverySplits() throws LogException {
-        fetchPendingShardSplit();
-        assignSplit();
+        synchronized (lock) {
+            fetchPendingShardSplit();
+        }
+        synchronized (lock) {
+            assignSplit();
+        }
     }
 
     private void fetchPendingShardSplit() throws LogException {
@@ -296,7 +300,9 @@ public class SlsSourceSplitEnumerator
 
     @Override
     public SlsSourceState snapshotState(long checkpointId) throws Exception {
-        return new SlsSourceState(new HashSet<>(assignedSplit.values()));
+        synchronized (lock) {
+            return new SlsSourceState(new HashSet<>(assignedSplit.values()));
+        }
     }
 
     public boolean checkConsumerGroupExists(String project, String logstore, 
String consumerGroup)
diff --git 
a/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/source/TypesenseSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/source/TypesenseSourceSplitEnumerator.java
index 0088a32a2b..29125ea380 100644
--- 
a/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/source/TypesenseSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/source/TypesenseSourceSplitEnumerator.java
@@ -20,7 +20,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.typesense.source;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import 
org.apache.seatunnel.connectors.seatunnel.typesense.client.TypesenseClient;
 import 
org.apache.seatunnel.connectors.seatunnel.typesense.config.TypesenseBaseOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.typesense.config.TypesenseSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.typesense.dto.SourceCollectionInfo;
@@ -45,8 +44,6 @@ public class TypesenseSourceSplitEnumerator
 
     private final ReadonlyConfig config;
 
-    private TypesenseClient typesenseClient;
-
     private final Object stateLock = new Object();
 
     private Map<Integer, List<TypesenseSourceSplit>> pendingSplit;
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java
index 1cd1787883..2fcf40d460 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.core.starter.exception.CommandException;
 import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.e2e.sink.inmemory.InMemoryAggregatedCommitter;
 import org.apache.seatunnel.e2e.sink.inmemory.InMemorySinkWriter;
+import org.apache.seatunnel.e2e.source.inmemory.InMemorySourceSplitEnumerator;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Order;
@@ -41,7 +42,7 @@ public class MultiTableSinkTest {
     @Test
     public void testMultiTableSink()
             throws FileNotFoundException, URISyntaxException, CommandException 
{
-        String configurePath = "/config/fake_to_inmemory_multi_table.conf";
+        String configurePath = "/config/inmemory_to_inmemory_multi_table.conf";
         String configFile = getTestConfigFile(configurePath);
         FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
         flinkCommandArgs.setConfigFile(configFile);
@@ -73,6 +74,10 @@ public class MultiTableSinkTest {
         //        Assertions.assertIterableEquals(
         //                
Collections.singletonList("InMemoryMultiTableResourceManager::close"),
         //                committerResourceManagersEvents);
+
+        Assertions.assertIterableEquals(
+                Arrays.asList("registerReader_0", "run"),
+                InMemorySourceSplitEnumerator.getMethodInvoked());
     }
 
     public static String getTestConfigFile(String configFile)
diff --git 
a/seatunnel-core/seatunnel-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
similarity index 80%
rename from 
seatunnel-core/seatunnel-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
rename to 
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
index 846f00e7af..4778f59be3 100644
--- 
a/seatunnel-core/seatunnel-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
@@ -24,17 +24,8 @@ env {
 }
 
 source {
-  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
-  FakeSource {
+  InMemorySource {
     plugin_output = "fake"
-    parallelism = 1
-    schema = {
-      fields {
-        name = "string"
-        age = "int"
-        score = "double"
-      }
-    }
   }
 }
 
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/multitable/MultiTableSinkTest.java
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/multitable/MultiTableSinkTest.java
index 41b4285391..8212b36612 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/multitable/MultiTableSinkTest.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/multitable/MultiTableSinkTest.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.core.starter.exception.CommandException;
 import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.e2e.sink.inmemory.InMemoryAggregatedCommitter;
 import org.apache.seatunnel.e2e.sink.inmemory.InMemorySinkWriter;
+import org.apache.seatunnel.e2e.source.inmemory.InMemorySourceSplitEnumerator;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Order;
@@ -45,7 +46,7 @@ public class MultiTableSinkTest {
     @Test
     public void testMultiTableSink()
             throws FileNotFoundException, URISyntaxException, CommandException 
{
-        String configurePath = "/config/fake_to_inmemory_multi_table.conf";
+        String configurePath = "/config/inmemory_to_inmemory_multi_table.conf";
         String configFile = getTestConfigFile(configurePath);
         SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
         sparkCommandArgs.setConfigFile(configFile);
@@ -76,6 +77,10 @@ public class MultiTableSinkTest {
         //        Assertions.assertIterableEquals(
         //            
Collections.singletonList("InMemoryMultiTableResourceManager::close"),
         //            committerResourceManagersEvents);
+
+        Assertions.assertIterableEquals(
+                Arrays.asList("registerReader_0", "run"),
+                InMemorySourceSplitEnumerator.getMethodInvoked());
     }
 
     public static String getTestConfigFile(String configFile)
diff --git 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
similarity index 89%
rename from 
seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
rename to 
seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
index 8fe9317b4a..88473e3779 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
+++ 
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
@@ -29,16 +29,8 @@ env {
 
 source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
-  FakeSource {
+  InMemorySource {
     plugin_output = "fake"
-    parallelism = 1
-    schema = {
-      fields {
-        name = "string"
-        age = "int"
-        score = "double"
-      }
-    }
   }
 }
 
diff --git 
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/multitable/MultiTableSinkTest.java
 
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/multitable/MultiTableSinkTest.java
index a542964511..cf39db9c89 100644
--- 
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/multitable/MultiTableSinkTest.java
+++ 
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/multitable/MultiTableSinkTest.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.core.starter.exception.CommandException;
 import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
 import org.apache.seatunnel.e2e.sink.inmemory.InMemoryAggregatedCommitter;
 import org.apache.seatunnel.e2e.sink.inmemory.InMemorySinkWriter;
+import org.apache.seatunnel.e2e.source.inmemory.InMemorySourceSplitEnumerator;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Order;
@@ -45,7 +46,7 @@ public class MultiTableSinkTest {
     @DisabledOnOs(value = {OS.WINDOWS})
     public void testMultiTableSink()
             throws FileNotFoundException, URISyntaxException, CommandException 
{
-        String configurePath = "/config/fake_to_inmemory_multi_table.conf";
+        String configurePath = "/config/inmemory_to_inmemory_multi_table.conf";
         String configFile = getTestConfigFile(configurePath);
         ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
         clientCommandArgs.setConfigFile(configFile);
@@ -74,6 +75,10 @@ public class MultiTableSinkTest {
         Assertions.assertIterableEquals(
                 
Collections.singletonList("InMemoryMultiTableResourceManager::close"),
                 committerResourceManagersEvents);
+
+        Assertions.assertIterableEquals(
+                Arrays.asList("registerReader_0", "run"),
+                InMemorySourceSplitEnumerator.getMethodInvoked());
     }
 
     public static String getTestConfigFile(String configFile)
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
 
b/seatunnel-core/seatunnel-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
similarity index 88%
rename from 
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
rename to 
seatunnel-core/seatunnel-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
index 846f00e7af..f805d6508e 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
+++ 
b/seatunnel-core/seatunnel-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
@@ -25,16 +25,8 @@ env {
 
 source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
-  FakeSource {
+  InMemorySource {
     plugin_output = "fake"
-    parallelism = 1
-    schema = {
-      fields {
-        name = "string"
-        age = "int"
-        score = "double"
-      }
-    }
   }
 }
 
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySource.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySource.java
new file mode 100644
index 0000000000..e27f1cffbc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySource.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.source.inmemory;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.util.Collections;
+import java.util.List;
+
+public class InMemorySource
+        implements SeaTunnelSource<SeaTunnelRow, InMemorySourceSplit, 
InMemoryState> {
+
+    private final ReadonlyConfig config;
+
+    public InMemorySource(ReadonlyConfig config) {
+        this.config = config;
+    }
+
+    @Override
+    public String getPluginName() {
+        return "InMemorySource";
+    }
+
+    @Override
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(
+                CatalogTable.of(
+                        TableIdentifier.of("e2e", TablePath.DEFAULT),
+                        TableSchema.builder().build(),
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        "InMemorySource"));
+    }
+
+    @Override
+    public SourceReader<SeaTunnelRow, InMemorySourceSplit> createReader(
+            SourceReader.Context readerContext) {
+        return new InMemorySourceReader(Collections.emptyList(), 
readerContext);
+    }
+
+    @Override
+    public SourceSplitEnumerator<InMemorySourceSplit, InMemoryState> 
createEnumerator(
+            SourceSplitEnumerator.Context<InMemorySourceSplit> 
enumeratorContext) {
+        return new InMemorySourceSplitEnumerator(enumeratorContext);
+    }
+
+    @Override
+    public SourceSplitEnumerator<InMemorySourceSplit, InMemoryState> 
restoreEnumerator(
+            SourceSplitEnumerator.Context<InMemorySourceSplit> 
enumeratorContext,
+            InMemoryState checkpointState) {
+        return new InMemorySourceSplitEnumerator(enumeratorContext);
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceFactory.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceFactory.java
new file mode 100644
index 0000000000..4b6ae52f13
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.source.inmemory;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+@AutoService(Factory.class)
+public class InMemorySourceFactory implements TableSourceFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "InMemorySource";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder().build();
+    }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
InMemorySource(context.getOptions());
+    }
+
+    @Override
+    public Class<? extends SeaTunnelSource> getSourceClass() {
+        return InMemorySource.class;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceReader.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceReader.java
new file mode 100644
index 0000000000..b8a366b228
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.source.inmemory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+public class InMemorySourceReader implements SourceReader<SeaTunnelRow, 
InMemorySourceSplit> {
+
+    private final Iterator<SeaTunnelRow> iterator;
+    private final SourceReader.Context context;
+    private final Deque<InMemorySourceSplit> sourceSplits = new 
ConcurrentLinkedDeque<>();
+    private volatile boolean noMoreSplit;
+
+    public InMemorySourceReader(List<SeaTunnelRow> rows, SourceReader.Context 
context) {
+        this.iterator = rows.iterator();
+        this.context = context;
+    }
+
+    @Override
+    public void open() throws Exception {}
+
+    @Override
+    public void close() {}
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+        synchronized (output.getCheckpointLock()) {
+            InMemorySourceSplit split = sourceSplits.poll();
+            if (null != split) {
+                while (iterator.hasNext()) {
+                    SeaTunnelRow row = iterator.next();
+                    output.collect(row);
+                }
+            } else if (noMoreSplit && sourceSplits.isEmpty()) {
+                context.signalNoMoreElement();
+            } else {
+                Thread.sleep(1000L);
+            }
+        }
+    }
+
+    @Override
+    public List<InMemorySourceSplit> snapshotState(long checkpointId) throws 
Exception {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void addSplits(List<InMemorySourceSplit> splits) {
+        sourceSplits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        noMoreSplit = true;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplit.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplit.java
new file mode 100644
index 0000000000..208eea694e
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.source.inmemory;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class InMemorySourceSplit implements SourceSplit {
+
+    private final String splitId;
+
+    public InMemorySourceSplit(String splitId) {
+        this.splitId = splitId;
+    }
+
+    @Override
+    public String splitId() {
+        return splitId;
+    }
+
+    @Override
+    public String toString() {
+        return "InMemorySourceSplit{" + "splitId='" + splitId + '\'' + '}';
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplitEnumerator.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplitEnumerator.java
new file mode 100644
index 0000000000..e576249aff
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplitEnumerator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.source.inmemory;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class InMemorySourceSplitEnumerator
+        implements SourceSplitEnumerator<InMemorySourceSplit, InMemoryState> {
+
+    private final Context<InMemorySourceSplit> context;
+    private final Object lock = new Object();
+
+    public static final List<String> methodInvoked = new ArrayList<>();
+
+    public InMemorySourceSplitEnumerator(Context<InMemorySourceSplit> context) 
{
+        this.context = context;
+    }
+
+    public static List<String> getMethodInvoked() {
+        return methodInvoked;
+    }
+
+    @Override
+    public void open() {}
+
+    @Override
+    public void run() {
+        methodInvoked.add("run");
+        for (int i = 0; i < context.currentParallelism(); i++) {
+            synchronized (lock) {
+                context.assignSplit(i, new InMemorySourceSplit("split-" + i));
+                context.signalNoMoreSplits(i);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing
+    }
+
+    @Override
+    public void addSplitsBack(List<InMemorySourceSplit> splits, int subtaskId) 
{
+        methodInvoked.add("addSplitsBack");
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return -1;
+    }
+
+    @Override
+    public void registerReader(int subtaskId) {
+        methodInvoked.add("registerReader_" + subtaskId);
+    }
+
+    @Override
+    public InMemoryState snapshotState(long checkpointId) {
+        synchronized (lock) {
+            return new InMemoryState();
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {}
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {}
+}
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemoryState.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemoryState.java
new file mode 100644
index 0000000000..7a8ff4b0bf
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemoryState.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.source.inmemory;
+
+import java.io.Serializable;
+
+public class InMemoryState implements Serializable {}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 2fb46d5dd0..2cfd76c549 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -224,7 +224,9 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
 
         SourceSplitEnumerator<SplitT, Serializable> enumerator = 
getEnumerator();
         this.addTaskMemberMapping(readerId, memberAddr);
-        enumerator.registerReader(readerId.getTaskIndex());
+        synchronized (this) {
+            enumerator.registerReader(readerId.getTaskIndex());
+        }
         int taskSize = taskMemberMapping.size();
         if (maxReaderSize == taskSize) {
             readerRegisterComplete = true;
@@ -303,7 +305,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
                 reportTaskStatus(WAITING_RESTORE);
                 break;
             case WAITING_RESTORE:
-                if (restoreComplete.isDone()) {
+                if (restoreComplete.isDone() && readerRegisterComplete) {
                     currState = READY_START;
                     reportTaskStatus(READY_START);
                 } else {
@@ -311,7 +313,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
                 }
                 break;
             case READY_START:
-                if (startCalled && readerRegisterComplete) {
+                if (startCalled) {
                     currState = STARTING;
                 } else {
                     Thread.sleep(100);
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 4e5d864369..842ee173dd 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -145,22 +145,16 @@ public class CoordinatedSource<T, SplitT extends 
SourceSplit, StateT extends Ser
                 (subtaskId, splits) -> {
                     splitEnumerator.addSplitsBack(splits, subtaskId);
                 });
-        readerMap
-                .entrySet()
-                .parallelStream()
-                .forEach(
-                        entry -> {
-                            try {
-                                entry.getValue().open();
-                                readerContextMap
-                                        .get(entry.getKey())
-                                        .getEventListener()
-                                        .onEvent(new ReaderOpenEvent());
-                                splitEnumerator.registerReader(entry.getKey());
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
+        readerMap.forEach(
+                (key, value) -> {
+                    try {
+                        value.open();
+                        
readerContextMap.get(key).getEventListener().onEvent(new ReaderOpenEvent());
+                        splitEnumerator.registerReader(key);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
index 7d8052bfd1..c73cd863da 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 /**
@@ -56,7 +57,7 @@ public class FlinkSourceEnumerator<SplitT extends 
SourceSplit, EnumStateT>
 
     private final Object lock = new Object();
 
-    private volatile boolean isRun = false;
+    private AtomicBoolean isRun = new AtomicBoolean(false);
 
     private volatile int currentRegisterReaders = 0;
 
@@ -82,30 +83,33 @@ public class FlinkSourceEnumerator<SplitT extends 
SourceSplit, EnumStateT>
 
     @Override
     public void addSplitsBack(List<SplitWrapper<SplitT>> splits, int 
subtaskId) {
-        sourceSplitEnumerator.addSplitsBack(
-                
splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()),
-                subtaskId);
+        synchronized (lock) {
+            sourceSplitEnumerator.addSplitsBack(
+                    
splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()),
+                    subtaskId);
+        }
     }
 
     @Override
     public void addReader(int subtaskId) {
-        sourceSplitEnumerator.registerReader(subtaskId);
         synchronized (lock) {
+            sourceSplitEnumerator.registerReader(subtaskId);
             currentRegisterReaders++;
-            if (!isRun && currentRegisterReaders == parallelism) {
-                try {
-                    sourceSplitEnumerator.run();
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-                isRun = true;
+        }
+        if (currentRegisterReaders == parallelism && !isRun.getAndSet(true)) {
+            try {
+                sourceSplitEnumerator.run();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
         }
     }
 
     @Override
     public EnumStateT snapshotState(long checkpointId) throws Exception {
-        return sourceSplitEnumerator.snapshotState(checkpointId);
+        synchronized (lock) {
+            return sourceSplitEnumerator.snapshotState(checkpointId);
+        }
     }
 
     @Override

Reply via email to