This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 814b19537c [Fix][Mongo-CDC] Fix the issue where mongo isExactlyOnce
defaults to true, causing room to malfunction (#9454)
814b19537c is described below
commit 814b19537c9c19520600b06cb42807791d15dbf2
Author: wanmingshi <[email protected]>
AuthorDate: Thu Jun 26 21:10:42 2025 +0800
[Fix][Mongo-CDC] Fix the issue where mongo isExactlyOnce defaults to true,
causing room to malfunction (#9454)
---
docs/en/connector-v2/source/MongoDB-CDC.md | 1 +
docs/zh/connector-v2/source/MongoDB-CDC.md | 33 +++++++++++-----------
.../cdc/mongodb/MongodbIncrementalSource.java | 2 ++
.../cdc/mongodb/config/MongodbSourceConfig.java | 8 ++++--
.../config/MongodbSourceConfigProvider.java | 10 ++++++-
.../cdc/mongodb/config/MongodbSourceOptions.java | 6 ++++
.../source/fetch/MongodbFetchTaskContext.java | 2 +-
7 files changed, 42 insertions(+), 20 deletions(-)
diff --git a/docs/en/connector-v2/source/MongoDB-CDC.md
b/docs/en/connector-v2/source/MongoDB-CDC.md
index 43f51d2edd..1814d023e8 100644
--- a/docs/en/connector-v2/source/MongoDB-CDC.md
+++ b/docs/en/connector-v2/source/MongoDB-CDC.md
@@ -122,6 +122,7 @@ For specific types in MongoDB, we use Extended JSON format
to map them to Seatun
| poll.await.time.ms | Long | No | 1000 | The
amount of time to wait before checking for new results on the change stream.
|
| heartbeat.interval.ms | String | No | 0 | The
length of time in milliseconds between sending heartbeat messages. Use 0 to
disable.
|
| incremental.snapshot.chunk.size.mb | Long | No | 64 | The chunk
size mb of incremental snapshot.
|
+| exactly_once | Boolean| No | false | Enable
exactly once semantic. Enabling this may cause an out-of-memory risk during the
large table snapshot stage in recovery.
|
| common-options | | No | - | Source
plugin common parameters, please refer to [Source Common
Options](../source-common-options.md) for details.
|
### Tips
diff --git a/docs/zh/connector-v2/source/MongoDB-CDC.md
b/docs/zh/connector-v2/source/MongoDB-CDC.md
index 24ab23c1a1..6e8783bc31 100644
--- a/docs/zh/connector-v2/source/MongoDB-CDC.md
+++ b/docs/zh/connector-v2/source/MongoDB-CDC.md
@@ -107,22 +107,23 @@ db.createUser(
## 源配置项
-| Name | 类型 | 必须 | 默认值 | 描述
|
-|------------------------------------|--------|----------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| hosts | String | 是 | - |
MongoDB服务器的主机名和端口对的逗号分隔列表。如 `localhost:27017,localhost:27018`
|
-| username | String | 否 | - |
连接到MongoDB时要使用的数据库用户的名称。
|
-| password | String | 否 | - |
连接到MongoDB时使用的密码。
|
-| database | List | 是 | - |
要监视更改的数据库的名称。如果未设置,则将捕获所有数据库。该数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如db1、db2。
|
-| collection | List | 是 | - |
要监视更改的数据库中集合的名称。如果未设置,则将捕获所有集合。该集合还支持正则表达式来监视与完全限定的集合标识符匹配的多个集合。例如db1.coll1、db2.coll2。
|
-| schema | | 否 | - |
数据的结构,包括字段名和字段类型,使用单表cdc。
|
-| tables_configs | | 否 | - |
数据的结构,包括字段名和字段类型,使用多表cdc。
|
-| connection.options | String | 否 | - |
与号分隔了MongoDB的连接选项。如。 `replicaSet=test&connectTimeoutMS=300000`.
|
-| batch.size | Long | 否 | 1024 | 批量大小。
|
-| poll.max.batch.size | Enum | 否 | 1024 |
轮询新数据时,单个批中包含的更改流文档的最大数量。
|
-| poll.await.time.ms | Long | 否 | 1000 |
在检查更改流上的新结果之前等待的时间量。
|
-| heartbeat.interval.ms | String | 否 | 0 |
发送心跳消息之间的时间长度(毫秒)。使用0禁用。
|
-| incremental.snapshot.chunk.size.mb | Long | 否 | 64 |
增量快照的块大小(mb)。
|
-| common-options | | 否 | - |
源插件常用参数,请参考 [Source Common Options](../source-common-options.md)
|
+| Name | 类型 | 必须 | 默认值 | 描述
|
+|------------------------------------|--------|----------|-------|---------------------------------------------------------------------------------------|
+| hosts | String | 是 | - |
MongoDB服务器的主机名和端口对的逗号分隔列表。如 `localhost:27017,localhost:27018`
|
+| username | String | 否 | - |
连接到MongoDB时要使用的数据库用户的名称。
|
+| password | String | 否 | - |
连接到MongoDB时使用的密码。
|
+| database | List | 是 | - |
要监视更改的数据库的名称。如果未设置,则将捕获所有数据库。该数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如db1、db2。
|
+| collection | List | 是 | - |
要监视更改的数据库中集合的名称。如果未设置,则将捕获所有集合。该集合还支持正则表达式来监视与完全限定的集合标识符匹配的多个集合。例如db1.coll1、db2.coll2。
|
+| schema | | 否 | - |
数据的结构,包括字段名和字段类型,使用单表cdc。
|
+| tables_configs | | 否 | - |
数据的结构,包括字段名和字段类型,使用多表cdc。
|
+| connection.options | String | 否 | - |
与号分隔了MongoDB的连接选项。如。 `replicaSet=test&connectTimeoutMS=300000`.
|
+| batch.size | Long | 否 | 1024 | 批量大小。
|
+| poll.max.batch.size | Enum | 否 | 1024 |
轮询新数据时,单个批中包含的更改流文档的最大数量。
|
+| poll.await.time.ms | Long | 否 | 1000 |
在检查更改流上的新结果之前等待的时间量。
|
+| heartbeat.interval.ms | String | 否 | 0 |
发送心跳消息之间的时间长度(毫秒)。使用0禁用。
|
+| incremental.snapshot.chunk.size.mb | Long | 否 | 64 |
增量快照的块大小(mb)。
|
+| exactly_once | Boolean| 否 | false |
启用精确一次语义,若开启在大表快照阶段恢复时会有内存溢出风险。
|
+| common-options | | 否 | - | 源插件常用参数,请参考
[Source Common Options](../source-common-options.md) |
### 提示
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
index 5a93b45c3f..996eda76e8 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
@@ -89,6 +89,8 @@ public class MongodbIncrementalSource<T> extends
IncrementalSource<T, MongodbSou
.ifPresent(builder::connectionOptions);
Optional.ofNullable(config.get(MongodbSourceOptions.BATCH_SIZE))
.ifPresent(builder::batchSize);
+ Optional.ofNullable(config.get(MongodbSourceOptions.EXACTLY_ONCE))
+ .ifPresent(builder::exactlyOnce);
Optional.ofNullable(config.get(MongodbSourceOptions.POLL_MAX_BATCH_SIZE))
.ifPresent(builder::pollMaxBatchSize);
Optional.ofNullable(config.get(MongodbSourceOptions.POLL_AWAIT_TIME_MILLIS))
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfig.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfig.java
index 049b37db36..c8220f09cf 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfig.java
@@ -65,6 +65,8 @@ public class MongodbSourceConfig implements SourceConfig {
private final int splitSizeMB;
+ private final boolean exactlyOnce;
+
MongodbSourceConfig(
String hosts,
String username,
@@ -80,7 +82,8 @@ public class MongodbSourceConfig implements SourceConfig {
StopConfig stopOptions,
int heartbeatIntervalMillis,
int splitMetaGroupSize,
- int splitSizeMB) {
+ int splitSizeMB,
+ boolean exactlyOnce) {
this.hosts = checkNotNull(hosts);
this.username = username;
this.password = password;
@@ -98,6 +101,7 @@ public class MongodbSourceConfig implements SourceConfig {
this.heartbeatIntervalMillis = heartbeatIntervalMillis;
this.splitMetaGroupSize = splitMetaGroupSize;
this.splitSizeMB = splitSizeMB;
+ this.exactlyOnce = exactlyOnce;
}
@Override
@@ -117,6 +121,6 @@ public class MongodbSourceConfig implements SourceConfig {
@Override
public boolean isExactlyOnce() {
- return true;
+ return exactlyOnce;
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
index 633a4117fd..0dfaac2e66 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
@@ -29,6 +29,7 @@ import java.util.Objects;
import static
org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.EXACTLY_ONCE;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.POLL_AWAIT_TIME_MILLIS;
@@ -58,6 +59,7 @@ public class MongodbSourceConfigProvider {
private StopConfig stopOptions;
private int heartbeatIntervalMillis =
HEARTBEAT_INTERVAL_MILLIS.defaultValue();
private int splitMetaGroupSize = 2;
+ private boolean exactlyOnce = EXACTLY_ONCE.defaultValue();
private int splitSizeMB =
INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue();
public Builder hosts(String hosts) {
@@ -90,6 +92,11 @@ public class MongodbSourceConfigProvider {
return this;
}
+ public Builder exactlyOnce(boolean exactlyOnce) {
+ this.exactlyOnce = exactlyOnce;
+ return this;
+ }
+
public Builder batchSize(int batchSize) {
checkArgument(batchSize >= 0);
this.batchSize = batchSize;
@@ -169,7 +176,8 @@ public class MongodbSourceConfigProvider {
stopOptions,
heartbeatIntervalMillis,
splitMetaGroupSize,
- splitSizeMB);
+ splitSizeMB,
+ exactlyOnce);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
index 653458b276..f4c4ccee9c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
@@ -234,6 +234,12 @@ public class MongodbSourceOptions extends SourceOptions {
"The amount of time to wait before checking for
new results on the change stream."
+ "Defaults: 1000.");
+ public static final Option<Boolean> EXACTLY_ONCE =
+ Options.key("exactly_once")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Enable exactly once semantic.");
+
public static final Option<Integer> HEARTBEAT_INTERVAL_MILLIS =
Options.key("heartbeat.interval.ms")
.intType()
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
index 9275c54bd2..ef514104d6 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java
@@ -139,7 +139,7 @@ public class MongodbFetchTaskContext implements
FetchTask.Context {
@Override
public boolean isExactlyOnce() {
- return true;
+ return sourceConfig.isExactlyOnce();
}
@Override