This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9212a77140 [Improve][API] Optimize the enumerator API semantics and
reduce lock calls at the connector level (#9671)
9212a77140 is described below
commit 9212a77140bb9a310237b4ef82126d42b5d767e6
Author: Jia Fan <[email protected]>
AuthorDate: Fri Aug 15 10:38:06 2025 +0800
[Improve][API] Optimize the enumerator API semantics and reduce lock calls
at the connector level (#9671)
---
.../api/source/SourceSplitEnumerator.java | 23 +++++-
.../enumerator/TiDBSourceSplitEnumerator.java | 4 +-
.../split/ClickhouseSourceSplitEnumerator.java | 18 ++---
.../source/split/DorisSourceSplitEnumerator.java | 22 +++---
.../source/split/FileSourceSplitEnumerator.java | 9 ++-
.../MultipleTableFileSourceSplitEnumerator.java | 9 ++-
.../MultipleTableHiveSourceSplitEnumerator.java | 9 ++-
.../source/enumerator/AbstractSplitEnumerator.java | 22 +++---
.../jdbc/source/JdbcSourceSplitEnumerator.java | 22 +++---
.../kafka/source/KafkaSourceSplitEnumerator.java | 16 ++--
.../kudu/source/KuduSourceSplitEnumerator.java | 24 +++---
.../source/MaxcomputeSourceSplitEnumerator.java | 13 +++-
.../milvus/source/MilvusSourceSplitEnumerator.java | 22 +++---
.../source/enumerator/MongodbSplitEnumerator.java | 26 ++++---
.../source/enumerator/AbstractSplitEnumerator.java | 9 ++-
.../PaimonBatchSourceSplitEnumerator.java | 8 +-
.../source/enumerator/PulsarSplitEnumerator.java | 13 +++-
.../source/RocketMqSourceSplitEnumerator.java | 27 +++++--
.../sls/source/SlsSourceSplitEnumerator.java | 16 ++--
.../source/TypesenseSourceSplitEnumerator.java | 3 -
.../flink/multitable/MultiTableSinkTest.java | 7 +-
.../config/inmemory_to_inmemory_multi_table.conf} | 11 +--
.../spark/multitable/MultiTableSinkTest.java | 7 +-
....conf => inmemory_to_inmemory_multi_table.conf} | 10 +--
.../seatunnel/multitable/MultiTableSinkTest.java | 7 +-
.../config/inmemory_to_inmemory_multi_table.conf} | 10 +--
.../e2e/source/inmemory/InMemorySource.java | 82 ++++++++++++++++++++
.../e2e/source/inmemory/InMemorySourceFactory.java | 54 +++++++++++++
.../e2e/source/inmemory/InMemorySourceReader.java | 82 ++++++++++++++++++++
.../e2e/source/inmemory/InMemorySourceSplit.java | 39 ++++++++++
.../inmemory/InMemorySourceSplitEnumerator.java | 88 ++++++++++++++++++++++
.../e2e/source/inmemory/InMemoryState.java | 22 ++++++
.../server/task/SourceSplitEnumeratorTask.java | 8 +-
.../translation/source/CoordinatedSource.java | 26 +++----
.../flink/source/FlinkSourceEnumerator.java | 30 ++++----
35 files changed, 609 insertions(+), 189 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
index 897d18ac5f..d1488d00db 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceSplitEnumerator.java
@@ -38,7 +38,20 @@ public interface SourceSplitEnumerator<SplitT extends
SourceSplit, StateT>
void open();
- /** The method is executed by the engine only once. */
+ /**
+ * Executes engine setup steps in a fixed, non‑concurrent sequence.
+ *
+ * <p>Before the first {@link #run()} invocation, methods are called in
this order:
+ *
+ * <ol>
+ * <li>{@link #open()}
+ * <li>{@link #addSplitsBack(List, int)}
+ * <li>{@link #registerReader(int)}
+ * </ol>
+ *
+ * <p>{@implNote The engine guarantees this invocation order and ensures
there are no
+ * concurrency issues between these calls.}
+ */
void run() throws Exception;
/**
@@ -63,7 +76,13 @@ public interface SourceSplitEnumerator<SplitT extends
SourceSplit, StateT>
void registerReader(int subtaskId);
- /** If the source is bounded, checkpoint is not triggered. */
+ /**
+ * Used to snapshot the state of the enumerator.
+ *
+ * <p><strong>Concurrency Consideration:</strong><br>
+ * This method and {@link #run()} can be invoked concurrently by different
threads.
+ * Systematically manage shared state access to prevent race conditions.
+ */
StateT snapshotState(long checkpointId) throws Exception;
/**
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java
index 466d056bc1..fc4b7dd332 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-tidb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/tidb/source/enumerator/TiDBSourceSplitEnumerator.java
@@ -211,7 +211,9 @@ public class TiDBSourceSplitEnumerator
*/
@Override
public TiDBSourceCheckpointState snapshotState(long checkpointId) throws
Exception {
- return new TiDBSourceCheckpointState(shouldEnumerate, pendingSplit);
+ synchronized (stateLock) {
+ return new TiDBSourceCheckpointState(shouldEnumerate,
pendingSplit);
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplitEnumerator.java
index 451d814e02..be90e84243 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/split/ClickhouseSourceSplitEnumerator.java
@@ -121,16 +121,14 @@ public class ClickhouseSourceSplitEnumerator
@Override
public void addSplitsBack(List<ClickhouseSourceSplit> splits, int
subtaskId) {
if (!splits.isEmpty()) {
- synchronized (stateLock) {
- addPendingSplit(splits, subtaskId);
- if (context.registeredReaders().contains(subtaskId)) {
- assignSplit(Collections.singletonList(subtaskId));
- } else {
- LOG.warn(
- "Reader {} is not registered. Pending splits {}
are not assigned.",
- subtaskId,
- splits);
- }
+ addPendingSplit(splits, subtaskId);
+ if (context.registeredReaders().contains(subtaskId)) {
+ assignSplit(Collections.singletonList(subtaskId));
+ } else {
+ LOG.warn(
+ "Reader {} is not registered. Pending splits {} are
not assigned.",
+ subtaskId,
+ splits);
}
}
LOG.info("Add back splits {} to JdbcSourceSplitEnumerator.",
splits.size());
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java
index af18ac5629..c2f7cdf007 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/split/DorisSourceSplitEnumerator.java
@@ -107,16 +107,14 @@ public class DorisSourceSplitEnumerator
public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) {
log.debug("Add back splits {} to DorisSourceSplitEnumerator.", splits);
if (!splits.isEmpty()) {
- synchronized (stateLock) {
- addPendingSplit(splits);
- if (context.registeredReaders().contains(subtaskId)) {
- assignSplit(Collections.singletonList(subtaskId));
- } else {
- log.warn(
- "Reader {} is not registered. Pending splits {}
are not assigned.",
- subtaskId,
- splits);
- }
+ addPendingSplit(splits);
+ if (context.registeredReaders().contains(subtaskId)) {
+ assignSplit(Collections.singletonList(subtaskId));
+ } else {
+ log.warn(
+ "Reader {} is not registered. Pending splits {} are
not assigned.",
+ subtaskId,
+ splits);
}
}
}
@@ -137,9 +135,7 @@ public class DorisSourceSplitEnumerator
public void registerReader(int subtaskId) {
log.debug("Register reader {} to DorisSourceSplitEnumerator.",
subtaskId);
if (!pendingSplit.isEmpty()) {
- synchronized (stateLock) {
- assignSplit(Collections.singletonList(subtaskId));
- }
+ assignSplit(Collections.singletonList(subtaskId));
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
index 544c759e21..dadb7066cf 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java
@@ -43,6 +43,7 @@ public class FileSourceSplitEnumerator
new TreeSet<>(Comparator.comparing(FileSourceSplit::splitId));
private Set<FileSourceSplit> assignedSplit;
private final List<String> filePaths;
+ private final Object lock = new Object();
private final AtomicInteger assignCount = new AtomicInteger(0);
public FileSourceSplitEnumerator(
@@ -69,7 +70,9 @@ public class FileSourceSplitEnumerator
public void run() {
for (int i = 0; i < context.currentParallelism(); i++) {
LOGGER.info("Assigned splits to reader [{}]", i);
- assignSplit(i);
+ synchronized (lock) {
+ assignSplit(i);
+ }
}
}
@@ -139,7 +142,9 @@ public class FileSourceSplitEnumerator
@Override
public FileSourceState snapshotState(long checkpointId) {
- return new FileSourceState(assignedSplit);
+ synchronized (lock) {
+ return new FileSourceState(assignedSplit);
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
index 4f5938b85a..bfa6024479 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
@@ -46,6 +46,7 @@ public class MultipleTableFileSourceSplitEnumerator
private final Set<FileSourceSplit> assignedSplit;
private final Map<String, List<String>> filePathMap;
private final AtomicInteger assignCount = new AtomicInteger(0);
+ private final Object lock = new Object();
public MultipleTableFileSourceSplitEnumerator(
Context<FileSourceSplit> context,
@@ -107,7 +108,9 @@ public class MultipleTableFileSourceSplitEnumerator
@Override
public FileSourceState snapshotState(long checkpointId) {
- return new FileSourceState(assignedSplit);
+ synchronized (lock) {
+ return new FileSourceState(assignedSplit);
+ }
}
@Override
@@ -155,7 +158,9 @@ public class MultipleTableFileSourceSplitEnumerator
public void run() throws Exception {
for (int i = 0; i < context.currentParallelism(); i++) {
log.info("Assigned splits to reader [{}]", i);
- assignSplit(i);
+ synchronized (lock) {
+ assignSplit(i);
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
index d927f83436..96a8779e26 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
@@ -47,6 +47,7 @@ public class MultipleTableHiveSourceSplitEnumerator
private final Set<FileSourceSplit> assignedSplit;
private final Map<String, List<String>> filePathMap;
private final AtomicInteger assignCount = new AtomicInteger(0);
+ private final Object lock = new Object();
public MultipleTableHiveSourceSplitEnumerator(
SourceSplitEnumerator.Context<FileSourceSplit> context,
@@ -108,7 +109,9 @@ public class MultipleTableHiveSourceSplitEnumerator
@Override
public FileSourceState snapshotState(long checkpointId) {
- return new FileSourceState(assignedSplit);
+ synchronized (lock) {
+ return new FileSourceState(assignedSplit);
+ }
}
@Override
@@ -156,7 +159,9 @@ public class MultipleTableHiveSourceSplitEnumerator
public void run() throws Exception {
for (int i = 0; i < context.currentParallelism(); i++) {
log.info("Assigned splits to reader [{}]", i);
- assignSplit(i);
+ synchronized (lock) {
+ assignSplit(i);
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
index 4dff9e67c9..39c874a63b 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/AbstractSplitEnumerator.java
@@ -131,16 +131,14 @@ public abstract class AbstractSplitEnumerator
@Override
public void addSplitsBack(List<IcebergFileScanTaskSplit> splits, int
subtaskId) {
if (!splits.isEmpty()) {
- synchronized (stateLock) {
- addPendingSplits(splits);
- if (context.registeredReaders().contains(subtaskId)) {
- assignPendingSplits(Collections.singleton(subtaskId));
- } else {
- log.warn(
- "Reader {} is not registered. Pending splits {}
are not assigned.",
- subtaskId,
- splits);
- }
+ addPendingSplits(splits);
+ if (context.registeredReaders().contains(subtaskId)) {
+ assignPendingSplits(Collections.singleton(subtaskId));
+ } else {
+ log.warn(
+ "Reader {} is not registered. Pending splits {} are
not assigned.",
+ subtaskId,
+ splits);
}
}
log.info("Add back splits {} to JdbcSourceSplitEnumerator.",
splits.size());
@@ -163,9 +161,7 @@ public abstract class AbstractSplitEnumerator
@Override
public void registerReader(int subtaskId) {
log.debug("Adding reader {} to IcebergSourceEnumerator.", subtaskId);
- synchronized (stateLock) {
- assignPendingSplits(Collections.singleton(subtaskId));
- }
+ assignPendingSplits(Collections.singleton(subtaskId));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
index 303e84678d..a0948dfe0d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceSplitEnumerator.java
@@ -103,16 +103,14 @@ public class JdbcSourceSplitEnumerator
@Override
public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
- synchronized (stateLock) {
- addPendingSplit(splits, subtaskId);
- if (context.registeredReaders().contains(subtaskId)) {
- assignSplit(Collections.singletonList(subtaskId));
- } else {
- LOG.warn(
- "Reader {} is not registered. Pending splits {}
are not assigned.",
- subtaskId,
- splits);
- }
+ addPendingSplit(splits, subtaskId);
+ if (context.registeredReaders().contains(subtaskId)) {
+ assignSplit(Collections.singletonList(subtaskId));
+ } else {
+ LOG.warn(
+ "Reader {} is not registered. Pending splits {} are
not assigned.",
+ subtaskId,
+ splits);
}
}
LOG.info("Add back splits {} to JdbcSourceSplitEnumerator.",
splits.size());
@@ -134,9 +132,7 @@ public class JdbcSourceSplitEnumerator
public void registerReader(int subtaskId) {
LOG.info("Register reader {} to JdbcSourceSplitEnumerator.",
subtaskId);
if (!pendingSplits.isEmpty()) {
- synchronized (stateLock) {
- assignSplit(Collections.singletonList(subtaskId));
- }
+ assignSplit(Collections.singletonList(subtaskId));
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 024f6416a8..26c61ddb2e 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -69,7 +69,7 @@ public class KafkaSourceSplitEnumerator
private ScheduledExecutorService executor;
private ScheduledFuture<?> scheduledFuture;
private volatile boolean initialized;
-
+ private final Object lock = new Object();
private final Map<String, TablePath> topicMappingTablePathMap = new
HashMap<>();
private boolean isStreamingMode;
@@ -145,9 +145,13 @@ public class KafkaSourceSplitEnumerator
@Override
public void run() throws ExecutionException, InterruptedException {
- fetchPendingPartitionSplit();
- setPartitionStartOffset();
- assignSplit();
+ synchronized (lock) {
+ fetchPendingPartitionSplit();
+ setPartitionStartOffset();
+ }
+ synchronized (lock) {
+ assignSplit();
+ }
if (!initialized) {
initialized = true;
}
@@ -288,7 +292,9 @@ public class KafkaSourceSplitEnumerator
@Override
public KafkaSourceState snapshotState(long checkpointId) throws Exception {
- return new KafkaSourceState(new HashSet<>(assignedSplit.values()));
+ synchronized (lock) {
+ return new KafkaSourceState(new HashSet<>(assignedSplit.values()));
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java
index ee6574c274..469332da1c 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceSplitEnumerator.java
@@ -124,16 +124,14 @@ public class KuduSourceSplitEnumerator
@Override
public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
- synchronized (stateLock) {
- addPendingSplit(splits, subtaskId);
- if (enumeratorContext.registeredReaders().contains(subtaskId))
{
- assignSplit(Collections.singletonList(subtaskId));
- } else {
- log.warn(
- "Reader {} is not registered. Pending splits {}
are not assigned.",
- subtaskId,
- splits);
- }
+ addPendingSplit(splits, subtaskId);
+ if (enumeratorContext.registeredReaders().contains(subtaskId)) {
+ assignSplit(Collections.singletonList(subtaskId));
+ } else {
+ log.warn(
+ "Reader {} is not registered. Pending splits {} are
not assigned.",
+ subtaskId,
+ splits);
}
}
log.info("Add back splits {} to JdbcSourceSplitEnumerator.",
splits.size());
@@ -192,10 +190,8 @@ public class KuduSourceSplitEnumerator
@Override
public void registerReader(int subtaskId) {
log.debug("Register reader {} to KuduSourceSplitEnumerator.",
subtaskId);
- synchronized (stateLock) {
- if (!pendingSplits.isEmpty()) {
- assignSplit(Collections.singletonList(subtaskId));
- }
+ if (!pendingSplits.isEmpty()) {
+ assignSplit(Collections.singletonList(subtaskId));
}
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
index f44f27cb0c..e6685f0f85 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceSplitEnumerator.java
@@ -44,6 +44,7 @@ public class MaxcomputeSourceSplitEnumerator
private Set<MaxcomputeSourceSplit> assignedSplits;
private final ReadonlyConfig readonlyConfig;
private final Map<TablePath, SourceTableInfo> sourceTableInfos;
+ private final Object stateLock = new Object();
public MaxcomputeSourceSplitEnumerator(
SourceSplitEnumerator.Context<MaxcomputeSourceSplit>
enumeratorContext,
@@ -70,8 +71,12 @@ public class MaxcomputeSourceSplitEnumerator
@Override
public void run() throws Exception {
- discoverySplits();
- assignPendingSplits();
+ synchronized (stateLock) {
+ discoverySplits();
+ }
+ synchronized (stateLock) {
+ assignPendingSplits();
+ }
}
@Override
@@ -92,7 +97,9 @@ public class MaxcomputeSourceSplitEnumerator
@Override
public MaxcomputeSourceState snapshotState(long checkpointId) {
- return new MaxcomputeSourceState(assignedSplits);
+ synchronized (stateLock) {
+ return new MaxcomputeSourceState(assignedSplits);
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
index 1308a73bef..b1c242d682 100644
---
a/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-milvus/src/main/java/org/apache/seatunnel/connectors/seatunnel/milvus/source/MilvusSourceSplitEnumerator.java
@@ -200,16 +200,14 @@ public class MilvusSourceSplitEnumerator
@Override
public void addSplitsBack(List<MilvusSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
- synchronized (stateLock) {
- addPendingSplit(splits, subtaskId);
- if (context.registeredReaders().contains(subtaskId)) {
- assignSplit(Collections.singletonList(subtaskId));
- } else {
- log.warn(
- "Reader {} is not registered. Pending splits {}
are not assigned.",
- subtaskId,
- splits);
- }
+ addPendingSplit(splits, subtaskId);
+ if (context.registeredReaders().contains(subtaskId)) {
+ assignSplit(Collections.singletonList(subtaskId));
+ } else {
+ log.warn(
+ "Reader {} is not registered. Pending splits {} are
not assigned.",
+ subtaskId,
+ splits);
}
}
log.info("Add back splits {} to JdbcSourceSplitEnumerator.",
splits.size());
@@ -235,9 +233,7 @@ public class MilvusSourceSplitEnumerator
public void registerReader(int subtaskId) {
log.info("Register reader {} to MilvusSourceSplitEnumerator.",
subtaskId);
if (!pendingSplits.isEmpty()) {
- synchronized (stateLock) {
- assignSplit(Collections.singletonList(subtaskId));
- }
+ assignSplit(Collections.singletonList(subtaskId));
}
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/enumerator/MongodbSplitEnumerator.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/enumerator/MongodbSplitEnumerator.java
index 1aa30b2c16..423092d466 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/enumerator/MongodbSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/enumerator/MongodbSplitEnumerator.java
@@ -47,7 +47,7 @@ public class MongodbSplitEnumerator
private final Context<MongoSplit> context;
private final MongodbClientProvider clientProvider;
-
+ private final Object stateLock = new Object();
private final MongoSplitStrategy strategy;
public MongodbSplitEnumerator(
@@ -74,14 +74,18 @@ public class MongodbSplitEnumerator
@Override
public synchronized void run() {
log.info("Starting MongoSplitEnumerator.");
- Set<Integer> readers = context.registeredReaders();
- pendingSplits.addAll(strategy.split());
- MongoNamespace namespace =
clientProvider.getDefaultCollection().getNamespace();
- log.info(
- "Added {} pending splits for namespace {}.",
- pendingSplits.size(),
- namespace.getFullName());
- assignSplits(readers);
+ synchronized (stateLock) {
+ pendingSplits.addAll(strategy.split());
+ MongoNamespace namespace =
clientProvider.getDefaultCollection().getNamespace();
+ log.info(
+ "Added {} pending splits for namespace {}.",
+ pendingSplits.size(),
+ namespace.getFullName());
+ }
+ synchronized (stateLock) {
+ Set<Integer> readers = context.registeredReaders();
+ assignSplits(readers);
+ }
}
@Override
@@ -121,7 +125,9 @@ public class MongodbSplitEnumerator
@Override
public ArrayList<MongoSplit> snapshotState(long checkpointId) {
- return pendingSplits;
+ synchronized (stateLock) {
+ return pendingSplits;
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
index 887769488b..2d748ad257 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java
@@ -63,6 +63,7 @@ public abstract class AbstractSplitEnumerator
protected Deque<PaimonSourceSplit> pendingSplits;
protected final TableScan tableScan;
+ protected final Object stateLock = new Object();
private final int splitMaxNum;
@@ -98,7 +99,9 @@ public abstract class AbstractSplitEnumerator
@Override
public void run() throws Exception {
- loadNewSplits();
+ synchronized (stateLock) {
+ loadNewSplits();
+ }
}
@Override
@@ -129,7 +132,9 @@ public abstract class AbstractSplitEnumerator
@Override
public PaimonSourceState snapshotState(long checkpointId) throws Exception
{
- return new PaimonSourceState(pendingSplits, nextSnapshotId);
+ synchronized (stateLock) {
+ return new PaimonSourceState(pendingSplits, nextSnapshotId);
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
index b00b38587a..46ecf42756 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java
@@ -44,7 +44,9 @@ public class PaimonBatchSourceSplitEnumerator extends
AbstractSplitEnumerator {
@Override
public void run() throws Exception {
- this.processDiscoveredSplits(this.scanNextSnapshot(), null);
+ synchronized (stateLock) {
+ this.processDiscoveredSplits(this.scanNextSnapshot(), null);
+ }
Set<Integer> readers = context.registeredReaders();
log.debug(
"No more splits to assign." + " Sending NoMoreSplitsEvent to
reader {}.", readers);
@@ -53,7 +55,9 @@ public class PaimonBatchSourceSplitEnumerator extends
AbstractSplitEnumerator {
@Override
public PaimonSourceState snapshotState(long checkpointId) throws Exception
{
- return new PaimonSourceState(pendingSplits, null);
+ synchronized (stateLock) {
+ return new PaimonSourceState(pendingSplits, null);
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
index e5fcef3e7a..9986ae4272 100644
---
a/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-pulsar/src/main/java/org/apache/seatunnel/connectors/seatunnel/pulsar/source/enumerator/PulsarSplitEnumerator.java
@@ -61,6 +61,7 @@ public class PulsarSplitEnumerator
private final long partitionDiscoveryIntervalMs;
private final StartCursor startCursor;
private final StopCursor stopCursor;
+ private final Object stateLock = new Object();
/** The consumer group id used for this PulsarSource. */
private final String subscriptionName;
@@ -152,9 +153,11 @@ public class PulsarSplitEnumerator
}
private void discoverySplits() {
- Set<TopicPartition> subscribedTopicPartitions =
- partitionDiscoverer.getSubscribedTopicPartitions(pulsarAdmin);
- checkPartitionChanges(subscribedTopicPartitions);
+ synchronized (stateLock) {
+ Set<TopicPartition> subscribedTopicPartitions =
+
partitionDiscoverer.getSubscribedTopicPartitions(pulsarAdmin);
+ checkPartitionChanges(subscribedTopicPartitions);
+ }
}
private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions) {
@@ -299,7 +302,9 @@ public class PulsarSplitEnumerator
@Override
public PulsarSplitEnumeratorState snapshotState(long checkpointId) throws
Exception {
- return new PulsarSplitEnumeratorState(assignedPartitions);
+ synchronized (stateLock) {
+ return new PulsarSplitEnumeratorState(assignedPartitions);
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
index 45ded447f2..18d60dbba9 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java
@@ -58,6 +58,7 @@ public class RocketMqSourceSplitEnumerator
private final Map<MessageQueue, RocketMqSourceSplit> pendingSplit;
private ScheduledExecutorService executor;
private ScheduledFuture scheduledFuture;
+ private final Object lock = new Object();
// ms
private long discoveryIntervalMillis;
@@ -118,9 +119,14 @@ public class RocketMqSourceSplitEnumerator
@Override
public void run() throws Exception {
- fetchPendingPartitionSplit();
- setPartitionStartOffset();
- assignSplit();
+ synchronized (lock) {
+ fetchPendingPartitionSplit();
+ setPartitionStartOffset();
+ }
+
+ synchronized (lock) {
+ assignSplit();
+ }
}
@Override
@@ -159,7 +165,8 @@ public class RocketMqSourceSplitEnumerator
split.setEndOffset(listOffsets.get(split.getMessageQueue()));
});
return splits.stream()
- .collect(Collectors.toMap(split ->
split.getMessageQueue(), split -> split));
+ .collect(
+
Collectors.toMap(RocketMqSourceSplit::getMessageQueue, split -> split));
} catch (Exception e) {
throw new RocketMqConnectorException(
RocketMqConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e);
@@ -185,7 +192,9 @@ public class RocketMqSourceSplitEnumerator
@Override
public RocketMqSourceState snapshotState(long checkpointId) throws
Exception {
- return new
RocketMqSourceState(assignedSplit.values().stream().collect(Collectors.toSet()));
+ synchronized (lock) {
+ return new RocketMqSourceState(new
HashSet<>(assignedSplit.values()));
+ }
}
@Override
@@ -194,8 +203,12 @@ public class RocketMqSourceSplitEnumerator
}
private void discoverySplits() {
- fetchPendingPartitionSplit();
- assignSplit();
+ synchronized (lock) {
+ fetchPendingPartitionSplit();
+ }
+ synchronized (lock) {
+ assignSplit();
+ }
}
private void fetchPendingPartitionSplit() {
diff --git
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java
index f178d441a3..5a872eb7f3 100644
---
a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceSplitEnumerator.java
@@ -56,6 +56,7 @@ public class SlsSourceSplitEnumerator
private final Map<Integer, SlsSourceSplit> pendingSplit;
private final Map<Integer, SlsSourceSplit> assignedSplit;
+ private final Object lock = new Object();
private SlsSourceState slsSourceState;
private ScheduledExecutorService executor;
@@ -124,8 +125,7 @@ public class SlsSourceSplitEnumerator
@Override
public void run() throws Exception {
- fetchPendingShardSplit();
- assignSplit();
+ discoverySplits();
}
@Override
@@ -157,8 +157,12 @@ public class SlsSourceSplitEnumerator
public void notifyCheckpointComplete(long checkpointId) throws Exception {}
private void discoverySplits() throws LogException {
- fetchPendingShardSplit();
- assignSplit();
+ synchronized (lock) {
+ fetchPendingShardSplit();
+ }
+ synchronized (lock) {
+ assignSplit();
+ }
}
private void fetchPendingShardSplit() throws LogException {
@@ -296,7 +300,9 @@ public class SlsSourceSplitEnumerator
@Override
public SlsSourceState snapshotState(long checkpointId) throws Exception {
- return new SlsSourceState(new HashSet<>(assignedSplit.values()));
+ synchronized (lock) {
+ return new SlsSourceState(new HashSet<>(assignedSplit.values()));
+ }
}
public boolean checkConsumerGroupExists(String project, String logstore,
String consumerGroup)
diff --git
a/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/source/TypesenseSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/source/TypesenseSourceSplitEnumerator.java
index 0088a32a2b..29125ea380 100644
---
a/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/source/TypesenseSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-typesense/src/main/java/org/apache/seatunnel/connectors/seatunnel/typesense/source/TypesenseSourceSplitEnumerator.java
@@ -20,7 +20,6 @@ package
org.apache.seatunnel.connectors.seatunnel.typesense.source;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import
org.apache.seatunnel.connectors.seatunnel.typesense.client.TypesenseClient;
import
org.apache.seatunnel.connectors.seatunnel.typesense.config.TypesenseBaseOptions;
import
org.apache.seatunnel.connectors.seatunnel.typesense.config.TypesenseSourceOptions;
import
org.apache.seatunnel.connectors.seatunnel.typesense.dto.SourceCollectionInfo;
@@ -45,8 +44,6 @@ public class TypesenseSourceSplitEnumerator
private final ReadonlyConfig config;
- private TypesenseClient typesenseClient;
-
private final Object stateLock = new Object();
private Map<Integer, List<TypesenseSourceSplit>> pendingSplit;
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java
index 1cd1787883..2fcf40d460 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/java/org/apache/seatunnel/core/starter/flink/multitable/MultiTableSinkTest.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.e2e.sink.inmemory.InMemoryAggregatedCommitter;
import org.apache.seatunnel.e2e.sink.inmemory.InMemorySinkWriter;
+import org.apache.seatunnel.e2e.source.inmemory.InMemorySourceSplitEnumerator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Order;
@@ -41,7 +42,7 @@ public class MultiTableSinkTest {
@Test
public void testMultiTableSink()
throws FileNotFoundException, URISyntaxException, CommandException
{
- String configurePath = "/config/fake_to_inmemory_multi_table.conf";
+ String configurePath = "/config/inmemory_to_inmemory_multi_table.conf";
String configFile = getTestConfigFile(configurePath);
FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
flinkCommandArgs.setConfigFile(configFile);
@@ -73,6 +74,10 @@ public class MultiTableSinkTest {
// Assertions.assertIterableEquals(
//
Collections.singletonList("InMemoryMultiTableResourceManager::close"),
// committerResourceManagersEvents);
+
+ Assertions.assertIterableEquals(
+ Arrays.asList("registerReader_0", "run"),
+ InMemorySourceSplitEnumerator.getMethodInvoked());
}
public static String getTestConfigFile(String configFile)
diff --git
a/seatunnel-core/seatunnel-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
similarity index 80%
rename from
seatunnel-core/seatunnel-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
rename to
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
index 846f00e7af..4778f59be3 100644
---
a/seatunnel-core/seatunnel-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
@@ -24,17 +24,8 @@ env {
}
source {
- # This is a example source plugin **only for test and demonstrate the
feature source plugin**
- FakeSource {
+ InMemorySource {
plugin_output = "fake"
- parallelism = 1
- schema = {
- fields {
- name = "string"
- age = "int"
- score = "double"
- }
- }
}
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/multitable/MultiTableSinkTest.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/multitable/MultiTableSinkTest.java
index 41b4285391..8212b36612 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/multitable/MultiTableSinkTest.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/java/org/apache/seatunnel/core/starter/spark/multitable/MultiTableSinkTest.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
import org.apache.seatunnel.e2e.sink.inmemory.InMemoryAggregatedCommitter;
import org.apache.seatunnel.e2e.sink.inmemory.InMemorySinkWriter;
+import org.apache.seatunnel.e2e.source.inmemory.InMemorySourceSplitEnumerator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Order;
@@ -45,7 +46,7 @@ public class MultiTableSinkTest {
@Test
public void testMultiTableSink()
throws FileNotFoundException, URISyntaxException, CommandException
{
- String configurePath = "/config/fake_to_inmemory_multi_table.conf";
+ String configurePath = "/config/inmemory_to_inmemory_multi_table.conf";
String configFile = getTestConfigFile(configurePath);
SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
sparkCommandArgs.setConfigFile(configFile);
@@ -76,6 +77,10 @@ public class MultiTableSinkTest {
// Assertions.assertIterableEquals(
//
Collections.singletonList("InMemoryMultiTableResourceManager::close"),
// committerResourceManagersEvents);
+
+ Assertions.assertIterableEquals(
+ Arrays.asList("registerReader_0", "run"),
+ InMemorySourceSplitEnumerator.getMethodInvoked());
}
public static String getTestConfigFile(String configFile)
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
similarity index 89%
rename from
seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
rename to
seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
index 8fe9317b4a..88473e3779 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
@@ -29,16 +29,8 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
- FakeSource {
+ InMemorySource {
plugin_output = "fake"
- parallelism = 1
- schema = {
- fields {
- name = "string"
- age = "int"
- score = "double"
- }
- }
}
}
diff --git
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/multitable/MultiTableSinkTest.java
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/multitable/MultiTableSinkTest.java
index a542964511..cf39db9c89 100644
---
a/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/multitable/MultiTableSinkTest.java
+++
b/seatunnel-core/seatunnel-starter/src/test/java/org/apache/seatunnel/core/starter/seatunnel/multitable/MultiTableSinkTest.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
import org.apache.seatunnel.e2e.sink.inmemory.InMemoryAggregatedCommitter;
import org.apache.seatunnel.e2e.sink.inmemory.InMemorySinkWriter;
+import org.apache.seatunnel.e2e.source.inmemory.InMemorySourceSplitEnumerator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Order;
@@ -45,7 +46,7 @@ public class MultiTableSinkTest {
@DisabledOnOs(value = {OS.WINDOWS})
public void testMultiTableSink()
throws FileNotFoundException, URISyntaxException, CommandException
{
- String configurePath = "/config/fake_to_inmemory_multi_table.conf";
+ String configurePath = "/config/inmemory_to_inmemory_multi_table.conf";
String configFile = getTestConfigFile(configurePath);
ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
clientCommandArgs.setConfigFile(configFile);
@@ -74,6 +75,10 @@ public class MultiTableSinkTest {
Assertions.assertIterableEquals(
Collections.singletonList("InMemoryMultiTableResourceManager::close"),
committerResourceManagersEvents);
+
+ Assertions.assertIterableEquals(
+ Arrays.asList("registerReader_0", "run"),
+ InMemorySourceSplitEnumerator.getMethodInvoked());
}
public static String getTestConfigFile(String configFile)
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
b/seatunnel-core/seatunnel-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
similarity index 88%
rename from
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
rename to
seatunnel-core/seatunnel-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
index 846f00e7af..f805d6508e 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/test/resources/config/fake_to_inmemory_multi_table.conf
+++
b/seatunnel-core/seatunnel-starter/src/test/resources/config/inmemory_to_inmemory_multi_table.conf
@@ -25,16 +25,8 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
- FakeSource {
+ InMemorySource {
plugin_output = "fake"
- parallelism = 1
- schema = {
- fields {
- name = "string"
- age = "int"
- score = "double"
- }
- }
}
}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySource.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySource.java
new file mode 100644
index 0000000000..e27f1cffbc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySource.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.source.inmemory;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.util.Collections;
+import java.util.List;
+
+public class InMemorySource
+ implements SeaTunnelSource<SeaTunnelRow, InMemorySourceSplit,
InMemoryState> {
+
+ private final ReadonlyConfig config;
+
+ public InMemorySource(ReadonlyConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public String getPluginName() {
+ return "InMemorySource";
+ }
+
+ @Override
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(
+ CatalogTable.of(
+ TableIdentifier.of("e2e", TablePath.DEFAULT),
+ TableSchema.builder().build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "InMemorySource"));
+ }
+
+ @Override
+ public SourceReader<SeaTunnelRow, InMemorySourceSplit> createReader(
+ SourceReader.Context readerContext) {
+ return new InMemorySourceReader(Collections.emptyList(),
readerContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<InMemorySourceSplit, InMemoryState>
createEnumerator(
+ SourceSplitEnumerator.Context<InMemorySourceSplit>
enumeratorContext) {
+ return new InMemorySourceSplitEnumerator(enumeratorContext);
+ }
+
+ @Override
+ public SourceSplitEnumerator<InMemorySourceSplit, InMemoryState>
restoreEnumerator(
+ SourceSplitEnumerator.Context<InMemorySourceSplit>
enumeratorContext,
+ InMemoryState checkpointState) {
+ return new InMemorySourceSplitEnumerator(enumeratorContext);
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceFactory.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceFactory.java
new file mode 100644
index 0000000000..4b6ae52f13
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.source.inmemory;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+import java.io.Serializable;
+
+@AutoService(Factory.class)
+public class InMemorySourceFactory implements TableSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "InMemorySource";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder().build();
+ }
+
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ return () -> (SeaTunnelSource<T, SplitT, StateT>) new
InMemorySource(context.getOptions());
+ }
+
+ @Override
+ public Class<? extends SeaTunnelSource> getSourceClass() {
+ return InMemorySource.class;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceReader.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceReader.java
new file mode 100644
index 0000000000..b8a366b228
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.source.inmemory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+public class InMemorySourceReader implements SourceReader<SeaTunnelRow,
InMemorySourceSplit> {
+
+ private final Iterator<SeaTunnelRow> iterator;
+ private final SourceReader.Context context;
+ private final Deque<InMemorySourceSplit> sourceSplits = new
ConcurrentLinkedDeque<>();
+ private volatile boolean noMoreSplit;
+
+ public InMemorySourceReader(List<SeaTunnelRow> rows, SourceReader.Context
context) {
+ this.iterator = rows.iterator();
+ this.context = context;
+ }
+
+ @Override
+ public void open() throws Exception {}
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ synchronized (output.getCheckpointLock()) {
+ InMemorySourceSplit split = sourceSplits.poll();
+ if (null != split) {
+ while (iterator.hasNext()) {
+ SeaTunnelRow row = iterator.next();
+ output.collect(row);
+ }
+ } else if (noMoreSplit && sourceSplits.isEmpty()) {
+ context.signalNoMoreElement();
+ } else {
+ Thread.sleep(1000L);
+ }
+ }
+ }
+
+ @Override
+ public List<InMemorySourceSplit> snapshotState(long checkpointId) throws
Exception {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void addSplits(List<InMemorySourceSplit> splits) {
+ sourceSplits.addAll(splits);
+ }
+
+ @Override
+ public void handleNoMoreSplits() {
+ noMoreSplit = true;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {}
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplit.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplit.java
new file mode 100644
index 0000000000..208eea694e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.source.inmemory;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+public class InMemorySourceSplit implements SourceSplit {
+
+ private final String splitId;
+
+ public InMemorySourceSplit(String splitId) {
+ this.splitId = splitId;
+ }
+
+ @Override
+ public String splitId() {
+ return splitId;
+ }
+
+ @Override
+ public String toString() {
+ return "InMemorySourceSplit{" + "splitId='" + splitId + '\'' + '}';
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplitEnumerator.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplitEnumerator.java
new file mode 100644
index 0000000000..e576249aff
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemorySourceSplitEnumerator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.source.inmemory;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class InMemorySourceSplitEnumerator
+ implements SourceSplitEnumerator<InMemorySourceSplit, InMemoryState> {
+
+ private final Context<InMemorySourceSplit> context;
+ private final Object lock = new Object();
+
+ public static final List<String> methodInvoked = new ArrayList<>();
+
+ public InMemorySourceSplitEnumerator(Context<InMemorySourceSplit> context)
{
+ this.context = context;
+ }
+
+ public static List<String> getMethodInvoked() {
+ return methodInvoked;
+ }
+
+ @Override
+ public void open() {}
+
+ @Override
+ public void run() {
+ methodInvoked.add("run");
+ for (int i = 0; i < context.currentParallelism(); i++) {
+ synchronized (lock) {
+ context.assignSplit(i, new InMemorySourceSplit("split-" + i));
+ context.signalNoMoreSplits(i);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing
+ }
+
+ @Override
+ public void addSplitsBack(List<InMemorySourceSplit> splits, int subtaskId)
{
+ methodInvoked.add("addSplitsBack");
+ }
+
+ @Override
+ public int currentUnassignedSplitSize() {
+ return -1;
+ }
+
+ @Override
+ public void registerReader(int subtaskId) {
+ methodInvoked.add("registerReader_" + subtaskId);
+ }
+
+ @Override
+ public InMemoryState snapshotState(long checkpointId) {
+ synchronized (lock) {
+ return new InMemoryState();
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {}
+
+ @Override
+ public void handleSplitRequest(int subtaskId) {}
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemoryState.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemoryState.java
new file mode 100644
index 0000000000..7a8ff4b0bf
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/source/inmemory/InMemoryState.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.source.inmemory;
+
+import java.io.Serializable;
+
+public class InMemoryState implements Serializable {}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 2fb46d5dd0..2cfd76c549 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -224,7 +224,9 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
SourceSplitEnumerator<SplitT, Serializable> enumerator =
getEnumerator();
this.addTaskMemberMapping(readerId, memberAddr);
- enumerator.registerReader(readerId.getTaskIndex());
+ synchronized (this) {
+ enumerator.registerReader(readerId.getTaskIndex());
+ }
int taskSize = taskMemberMapping.size();
if (maxReaderSize == taskSize) {
readerRegisterComplete = true;
@@ -303,7 +305,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
reportTaskStatus(WAITING_RESTORE);
break;
case WAITING_RESTORE:
- if (restoreComplete.isDone()) {
+ if (restoreComplete.isDone() && readerRegisterComplete) {
currState = READY_START;
reportTaskStatus(READY_START);
} else {
@@ -311,7 +313,7 @@ public class SourceSplitEnumeratorTask<SplitT extends
SourceSplit> extends Coord
}
break;
case READY_START:
- if (startCalled && readerRegisterComplete) {
+ if (startCalled) {
currState = STARTING;
} else {
Thread.sleep(100);
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 4e5d864369..842ee173dd 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -145,22 +145,16 @@ public class CoordinatedSource<T, SplitT extends
SourceSplit, StateT extends Ser
(subtaskId, splits) -> {
splitEnumerator.addSplitsBack(splits, subtaskId);
});
- readerMap
- .entrySet()
- .parallelStream()
- .forEach(
- entry -> {
- try {
- entry.getValue().open();
- readerContextMap
- .get(entry.getKey())
- .getEventListener()
- .onEvent(new ReaderOpenEvent());
- splitEnumerator.registerReader(entry.getKey());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
+ readerMap.forEach(
+ (key, value) -> {
+ try {
+ value.open();
+
readerContextMap.get(key).getEventListener().onEvent(new ReaderOpenEvent());
+ splitEnumerator.registerReader(key);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
index 7d8052bfd1..c73cd863da 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceEnumerator.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
@@ -56,7 +57,7 @@ public class FlinkSourceEnumerator<SplitT extends
SourceSplit, EnumStateT>
private final Object lock = new Object();
- private volatile boolean isRun = false;
+ private AtomicBoolean isRun = new AtomicBoolean(false);
private volatile int currentRegisterReaders = 0;
@@ -82,30 +83,33 @@ public class FlinkSourceEnumerator<SplitT extends
SourceSplit, EnumStateT>
@Override
public void addSplitsBack(List<SplitWrapper<SplitT>> splits, int
subtaskId) {
- sourceSplitEnumerator.addSplitsBack(
-
splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()),
- subtaskId);
+ synchronized (lock) {
+ sourceSplitEnumerator.addSplitsBack(
+
splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()),
+ subtaskId);
+ }
}
@Override
public void addReader(int subtaskId) {
- sourceSplitEnumerator.registerReader(subtaskId);
synchronized (lock) {
+ sourceSplitEnumerator.registerReader(subtaskId);
currentRegisterReaders++;
- if (!isRun && currentRegisterReaders == parallelism) {
- try {
- sourceSplitEnumerator.run();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- isRun = true;
+ }
+ if (currentRegisterReaders == parallelism && !isRun.getAndSet(true)) {
+ try {
+ sourceSplitEnumerator.run();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
}
@Override
public EnumStateT snapshotState(long checkpointId) throws Exception {
- return sourceSplitEnumerator.snapshotState(checkpointId);
+ synchronized (lock) {
+ return sourceSplitEnumerator.snapshotState(checkpointId);
+ }
}
@Override