This is an automated email from the ASF dual-hosted git repository.
chl-wxp pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a9be10ac30 [Feature][Connector-V2][MongoDB-CDC] Support latest-offset
startup mode without initial snapshot (#11053)
a9be10ac30 is described below
commit a9be10ac3029bc716f8fb7723e97f3b82a014312
Author: Gabriel Baldez <[email protected]>
AuthorDate: Thu Jun 11 05:23:21 2026 -0300
[Feature][Connector-V2][MongoDB-CDC] Support latest-offset startup mode
without initial snapshot (#11053)
---
docs/en/connectors/source/MongoDB-CDC.md | 33 ++++++++++++++
docs/zh/connectors/source/MongoDB-CDC.md | 33 ++++++++++++++
.../config/MongodbIncrementalSourceOptions.java | 10 +++--
.../config/MongodbSourceConfigProvider.java | 1 +
.../MongodbIncrementalSourceFactoryTest.java | 51 +++++++++++++++++++++-
5 files changed, 123 insertions(+), 5 deletions(-)
diff --git a/docs/en/connectors/source/MongoDB-CDC.md
b/docs/en/connectors/source/MongoDB-CDC.md
index 3656eb4497..3c153350fb 100644
--- a/docs/en/connectors/source/MongoDB-CDC.md
+++ b/docs/en/connectors/source/MongoDB-CDC.md
@@ -127,9 +127,42 @@ For specific types in MongoDB, we use Extended JSON format
to map them to Seatun
| poll.await.time.ms | Long | No | 1000 | The
amount of time to wait before checking for new results on the change stream.
|
| heartbeat.interval.ms | String | No | 0 | The
length of time in milliseconds between sending heartbeat messages. Use 0 to
disable.
|
| incremental.snapshot.chunk.size.mb | Long | No | 64 | The chunk
size mb of incremental snapshot.
|
+| startup.mode | Enum | No | INITIAL | Optional
startup mode for MongoDB CDC consumer, valid enumerations are `initial`,
`latest` and `timestamp`. See the [Startup Mode](#startup-mode) section below.
|
+| startup.timestamp | Long | No | - | Start
from the specified epoch timestamp (in milliseconds). Only used when
`startup.mode` is `timestamp`.
|
| exactly_once | Boolean| No | false | Enable
exactly once semantic. Enabling this may cause an out-of-memory risk during the
large table snapshot stage in recovery.
|
| common-options | | No | - | Source
plugin common parameters, please refer to [Source Common
Options](../common-options/source-common-options.md) for details.
|
+### Startup Mode
+
+The `startup.mode` option controls where the connector starts reading when a
job is submitted:
+
+- `initial` (default): reads a snapshot of the monitored collections first,
then switches to the change stream.
+- `latest`: skips the snapshot entirely and starts from the latest
change-stream position, so only changes made after the job starts are captured.
Snapshot-related options such as `incremental.snapshot.chunk.size.mb` are
ignored in this mode.
+- `timestamp`: skips the snapshot and starts reading the change stream from
the position given by `startup.timestamp`.
+
+When a job is restored from a checkpoint or savepoint, it resumes from the
checkpointed change-stream position regardless of `startup.mode`, so a restart
never falls back to a new snapshot.
+
+For example, to consume only changes made after the job starts:
+
+```hocon
+source {
+ MongoDB-CDC {
+ hosts = "mongo0:27017"
+ database = ["inventory"]
+ collection = ["inventory.products"]
+ startup.mode = "latest"
+ schema = {
+ fields {
+ "_id" : string,
+ "name" : string,
+ "description" : string,
+ "weight" : string
+ }
+ }
+ }
+}
+```
+
### Tips
> 1.If the collection changes at a slow pace, it is strongly recommended to
> set an appropriate value greater than 0 for the heartbeat.interval.ms
> parameter. When we recover a Seatunnel job from a checkpoint or savepoint,
> the heartbeat events can push the resumeToken forward to avoid its
> expiration.<br/>
diff --git a/docs/zh/connectors/source/MongoDB-CDC.md
b/docs/zh/connectors/source/MongoDB-CDC.md
index 0770da5054..83782ea380 100644
--- a/docs/zh/connectors/source/MongoDB-CDC.md
+++ b/docs/zh/connectors/source/MongoDB-CDC.md
@@ -127,9 +127,42 @@ db.grantRolesToUser("<USER_NAME>", ["<ROLE_NAME>"])
| poll.await.time.ms | Long | 否 | 1000 |
在检查更改流上的新结果之前等待的时间量。
|
| heartbeat.interval.ms | String | 否 | 0 |
发送心跳消息之间的时间长度(毫秒)。使用0禁用。
|
| incremental.snapshot.chunk.size.mb | Long | 否 | 64 |
增量快照的块大小(mb)。
|
+| startup.mode | Enum | 否 | INITIAL | MongoDB
CDC 消费者的可选启动模式,有效枚举为 `initial`、`latest` 和 `timestamp`。详见下方[启动模式](#启动模式)章节。
|
+| startup.timestamp | Long | 否 | - |
从指定的纪元时间戳(毫秒)开始消费。仅在 `startup.mode` 为 `timestamp` 时使用。
|
| exactly_once | Boolean| 否 | false |
启用精确一次语义,若开启在大表快照阶段恢复时会有内存溢出风险。
|
| common-options | | 否 | - | 源插件常用参数,请参考
[Source Common Options](../common-options/source-common-options.md)
|
+### 启动模式
+
+`startup.mode` 选项控制作业提交时连接器从哪里开始读取:
+
+- `initial`(默认):先读取所监视集合的快照,然后切换到变更流。
+- `latest`:完全跳过快照,从最新的变更流位置开始,只捕获作业启动之后产生的变更。在该模式下,与快照相关的选项(如
`incremental.snapshot.chunk.size.mb`)将被忽略。
+- `timestamp`:跳过快照,从 `startup.timestamp` 指定的位置开始读取变更流。
+
+当作业从检查点或保存点恢复时,无论 `startup.mode` 为何值,都会从检查点记录的变更流位置继续消费,重启不会回退到重新执行快照。
+
+例如,只消费作业启动之后产生的变更:
+
+```hocon
+source {
+ MongoDB-CDC {
+ hosts = "mongo0:27017"
+ database = ["inventory"]
+ collection = ["inventory.products"]
+ startup.mode = "latest"
+ schema = {
+ fields {
+ "_id" : string,
+ "name" : string,
+ "description" : string,
+ "weight" : string
+ }
+ }
+ }
+}
+```
+
### 提示
> 1.如果集合更改速度较慢,强烈建议为heartbeat.interval.ms参数设置一个大于0的适当值。当我们从检查点或保存点恢复Seatunnel作业时,心跳事件可以向前推resumeToken以避免其过期。<br/>
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbIncrementalSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbIncrementalSourceOptions.java
index 1a8ac0a5e6..ff2449e15d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbIncrementalSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbIncrementalSourceOptions.java
@@ -136,11 +136,15 @@ public class MongodbIncrementalSourceOptions extends
SourceOptions implements Ta
Options.key(SourceOptions.STARTUP_MODE_KEY)
.singleChoice(
StartupMode.class,
- Arrays.asList(StartupMode.INITIAL,
StartupMode.TIMESTAMP))
+ Arrays.asList(
+ StartupMode.INITIAL, StartupMode.LATEST,
StartupMode.TIMESTAMP))
.defaultValue(StartupMode.INITIAL)
.withDescription(
- "Optional startup mode for CDC source, valid
enumerations are "
- + "\"initial\", \"earliest\", \"latest\",
\"timestamp\"\n or \"specific\"");
+ "Optional startup mode for MongoDB CDC source,
valid enumerations are "
+ + "\"initial\", \"latest\" or
\"timestamp\". "
+ + "\"initial\": reads a snapshot of the
monitored collections first and then switches to the change stream. "
+ + "\"latest\": skips the snapshot entirely
and starts from the latest change-stream position, so only changes made after
the job starts are captured. "
+ + "\"timestamp\": skips the snapshot and
starts reading the change stream from the position given by
\"startup.timestamp\".");
public static final SingleChoiceOption<StopMode> STOP_MODE =
Options.key(SourceOptions.STOP_MODE_KEY)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
index cbb835129a..e7a69c5e0b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
@@ -111,6 +111,7 @@ public class MongodbSourceConfigProvider {
public Builder startupOptions(StartupConfig startupOptions) {
this.startupOptions = Objects.requireNonNull(startupOptions);
if (startupOptions.getStartupMode() != StartupMode.INITIAL
+ && startupOptions.getStartupMode() != StartupMode.LATEST
&& startupOptions.getStartupMode() !=
StartupMode.TIMESTAMP) {
throw new MongodbConnectorException(
ILLEGAL_ARGUMENT,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/source/MongodbIncrementalSourceFactoryTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/source/MongodbIncrementalSourceFactoryTest.java
index 8720c7556c..7fe549f80a 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/source/MongodbIncrementalSourceFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/source/MongodbIncrementalSourceFactoryTest.java
@@ -18,9 +18,16 @@
package mongodb.source;
import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.MongodbIncrementalSourceFactory;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfigProvider;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffsetFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -34,7 +41,7 @@ public class MongodbIncrementalSourceFactoryTest {
}
@Test
- public void testWithUnsupportedStartUpMode() {
+ public void testSupportedStartUpModes() {
MongodbIncrementalSourceFactory mongodbIncrementalSourceFactory =
new MongodbIncrementalSourceFactory();
mongodbIncrementalSourceFactory.optionRule().getOptionalOptions().stream()
@@ -42,8 +49,48 @@ public class MongodbIncrementalSourceFactoryTest {
.forEach(
(option) -> {
Assertions.assertIterableEquals(
- Arrays.asList(StartupMode.INITIAL,
StartupMode.TIMESTAMP),
+ Arrays.asList(
+ StartupMode.INITIAL,
+ StartupMode.LATEST,
+ StartupMode.TIMESTAMP),
((SingleChoiceOption<StartupMode>)
option).getOptionValues());
});
}
+
+ @Test
+ public void testSourceConfigBuilderAcceptsLatestStartupMode() {
+ // Regression for the real source-assembly path: the builder used to
reject
+ // StartupMode.LATEST at runtime even though the option rule
advertised it.
+ MongodbSourceConfig config =
+ MongodbSourceConfigProvider.newBuilder()
+ .hosts("localhost:27017")
+ .startupOptions(new StartupConfig(StartupMode.LATEST,
null, null, null))
+ .validate()
+ .create(0);
+
+ Assertions.assertEquals(StartupMode.LATEST,
config.getStartupConfig().getStartupMode());
+ }
+
+ @Test
+ public void testSourceConfigBuilderRejectsUnsupportedStartupMode() {
+ Assertions.assertThrows(
+ MongodbConnectorException.class,
+ () ->
+ MongodbSourceConfigProvider.newBuilder()
+ .startupOptions(
+ new
StartupConfig(StartupMode.EARLIEST, null, null, null)));
+ }
+
+ @Test
+ public void testLatestStartupModeResolvesToLatestChangeStreamOffset() {
+ StartupConfig startupConfig = new StartupConfig(StartupMode.LATEST,
null, null, null);
+
+ Offset startupOffset = startupConfig.getStartupOffset(new
ChangeStreamOffsetFactory());
+
+ Assertions.assertInstanceOf(ChangeStreamOffset.class, startupOffset);
+ // The latest offset is a current-time change-stream position, not a
resume token from a
+ // snapshot: starting here means only changes made after the job
starts are consumed.
+ Assertions.assertNotNull(((ChangeStreamOffset)
startupOffset).getTimestamp());
+ Assertions.assertNull(((ChangeStreamOffset)
startupOffset).getResumeToken());
+ }
}