This is an automated email from the ASF dual-hosted git repository.

chl-wxp pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a9be10ac30 [Feature][Connector-V2][MongoDB-CDC] Support latest-offset 
startup mode without initial snapshot (#11053)
a9be10ac30 is described below

commit a9be10ac3029bc716f8fb7723e97f3b82a014312
Author: Gabriel Baldez <[email protected]>
AuthorDate: Thu Jun 11 05:23:21 2026 -0300

    [Feature][Connector-V2][MongoDB-CDC] Support latest-offset startup mode 
without initial snapshot (#11053)
---
 docs/en/connectors/source/MongoDB-CDC.md           | 33 ++++++++++++++
 docs/zh/connectors/source/MongoDB-CDC.md           | 33 ++++++++++++++
 .../config/MongodbIncrementalSourceOptions.java    | 10 +++--
 .../config/MongodbSourceConfigProvider.java        |  1 +
 .../MongodbIncrementalSourceFactoryTest.java       | 51 +++++++++++++++++++++-
 5 files changed, 123 insertions(+), 5 deletions(-)

diff --git a/docs/en/connectors/source/MongoDB-CDC.md 
b/docs/en/connectors/source/MongoDB-CDC.md
index 3656eb4497..3c153350fb 100644
--- a/docs/en/connectors/source/MongoDB-CDC.md
+++ b/docs/en/connectors/source/MongoDB-CDC.md
@@ -127,9 +127,42 @@ For specific types in MongoDB, we use Extended JSON format 
to map them to Seatun
 | poll.await.time.ms                 | Long   | No       | 1000    | The 
amount of time to wait before checking for new results on the change stream.    
                                                                                
                                                                                
                        |
 | heartbeat.interval.ms              | String | No       | 0       | The 
length of time in milliseconds between sending heartbeat messages. Use 0 to 
disable.                                                                        
                                                                                
                            |
 | incremental.snapshot.chunk.size.mb | Long   | No       | 64      | The chunk 
size mb of incremental snapshot.                                                
                                                                                
                                                                                
                  |
+| startup.mode                       | Enum   | No       | INITIAL | Optional 
startup mode for MongoDB CDC consumer, valid enumerations are `initial`, 
`latest` and `timestamp`. See the [Startup Mode](#startup-mode) section below.  
                                                                                
                          |
+| startup.timestamp                  | Long   | No       | -       | Start 
from the specified epoch timestamp (in milliseconds). Only used when 
`startup.mode` is `timestamp`.                                                  
                                                                                
                                 |
 | exactly_once                       | Boolean| No       | false   | Enable 
exactly once semantic. Enabling this may cause an out-of-memory risk during the 
large table snapshot stage in recovery.                                         
                                                                                
                     |
 | common-options                     |        | No       | -       | Source 
plugin common parameters, please refer to [Source Common 
Options](../common-options/source-common-options.md) for details.               
                                                                                
                                                           |
 
+### Startup Mode
+
+The `startup.mode` option controls where the connector starts reading when a 
job is submitted:
+
+- `initial` (default): reads a snapshot of the monitored collections first, 
then switches to the change stream.
+- `latest`: skips the snapshot entirely and starts from the latest 
change-stream position, so only changes made after the job starts are captured. 
Snapshot-related options such as `incremental.snapshot.chunk.size.mb` are 
ignored in this mode.
+- `timestamp`: skips the snapshot and starts reading the change stream from 
the position given by `startup.timestamp`.
+
+When a job is restored from a checkpoint or savepoint, it resumes from the 
checkpointed change-stream position regardless of `startup.mode`, so a restart 
never falls back to a new snapshot.
+
+For example, to consume only changes made after the job starts:
+
+```hocon
+source {
+  MongoDB-CDC {
+    hosts = "mongo0:27017"
+    database = ["inventory"]
+    collection = ["inventory.products"]
+    startup.mode = "latest"
+    schema = {
+      fields {
+        "_id" : string,
+        "name" : string,
+        "description" : string,
+        "weight" : string
+      }
+    }
+  }
+}
+```
+
 ### Tips
 
 > 1.If the collection changes at a slow pace, it is strongly recommended to 
 > set an appropriate value greater than 0 for the heartbeat.interval.ms 
 > parameter. When we recover a Seatunnel job from a checkpoint or savepoint, 
 > the heartbeat events can push the resumeToken forward to avoid its 
 > expiration.<br/>
diff --git a/docs/zh/connectors/source/MongoDB-CDC.md 
b/docs/zh/connectors/source/MongoDB-CDC.md
index 0770da5054..83782ea380 100644
--- a/docs/zh/connectors/source/MongoDB-CDC.md
+++ b/docs/zh/connectors/source/MongoDB-CDC.md
@@ -127,9 +127,42 @@ db.grantRolesToUser("<USER_NAME>", ["<ROLE_NAME>"])
 | poll.await.time.ms                 | Long   | 否       | 1000  | 
在检查更改流上的新结果之前等待的时间量。                                                            
      |
 | heartbeat.interval.ms              | String | 否       | 0     | 
发送心跳消息之间的时间长度(毫秒)。使用0禁用。                                                        
      |
 | incremental.snapshot.chunk.size.mb | Long   | 否       | 64    | 
增量快照的块大小(mb)。                                                                   
      |
+| startup.mode                       | Enum   | 否       | INITIAL | MongoDB 
CDC 消费者的可选启动模式,有效枚举为 `initial`、`latest` 和 `timestamp`。详见下方[启动模式](#启动模式)章节。      
|
+| startup.timestamp                  | Long   | 否       | -     | 
从指定的纪元时间戳(毫秒)开始消费。仅在 `startup.mode` 为 `timestamp` 时使用。                          
        |
 | exactly_once                       | Boolean| 否       | false | 
启用精确一次语义,若开启在大表快照阶段恢复时会有内存溢出风险。                                                 
      |
 | common-options                     |        | 否       | -     | 源插件常用参数,请参考 
[Source Common Options](../common-options/source-common-options.md)             
         |
 
+### 启动模式
+
+`startup.mode` 选项控制作业提交时连接器从哪里开始读取:
+
+- `initial`(默认):先读取所监视集合的快照,然后切换到变更流。
+- `latest`:完全跳过快照,从最新的变更流位置开始,只捕获作业启动之后产生的变更。在该模式下,与快照相关的选项(如 
`incremental.snapshot.chunk.size.mb`)将被忽略。
+- `timestamp`:跳过快照,从 `startup.timestamp` 指定的位置开始读取变更流。
+
+当作业从检查点或保存点恢复时,无论 `startup.mode` 为何值,都会从检查点记录的变更流位置继续消费,重启不会回退到重新执行快照。
+
+例如,只消费作业启动之后产生的变更:
+
+```hocon
+source {
+  MongoDB-CDC {
+    hosts = "mongo0:27017"
+    database = ["inventory"]
+    collection = ["inventory.products"]
+    startup.mode = "latest"
+    schema = {
+      fields {
+        "_id" : string,
+        "name" : string,
+        "description" : string,
+        "weight" : string
+      }
+    }
+  }
+}
+```
+
 ### 提示
 
 > 1.如果集合更改速度较慢,强烈建议为heartbeat.interval.ms参数设置一个大于0的适当值。当我们从检查点或保存点恢复Seatunnel作业时,心跳事件可以向前推resumeToken以避免其过期。<br/>
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbIncrementalSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbIncrementalSourceOptions.java
index 1a8ac0a5e6..ff2449e15d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbIncrementalSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbIncrementalSourceOptions.java
@@ -136,11 +136,15 @@ public class MongodbIncrementalSourceOptions extends 
SourceOptions implements Ta
             Options.key(SourceOptions.STARTUP_MODE_KEY)
                     .singleChoice(
                             StartupMode.class,
-                            Arrays.asList(StartupMode.INITIAL, 
StartupMode.TIMESTAMP))
+                            Arrays.asList(
+                                    StartupMode.INITIAL, StartupMode.LATEST, 
StartupMode.TIMESTAMP))
                     .defaultValue(StartupMode.INITIAL)
                     .withDescription(
-                            "Optional startup mode for CDC source, valid 
enumerations are "
-                                    + "\"initial\", \"earliest\", \"latest\", 
\"timestamp\"\n or \"specific\"");
+                            "Optional startup mode for MongoDB CDC source, 
valid enumerations are "
+                                    + "\"initial\", \"latest\" or 
\"timestamp\". "
+                                    + "\"initial\": reads a snapshot of the 
monitored collections first and then switches to the change stream. "
+                                    + "\"latest\": skips the snapshot entirely 
and starts from the latest change-stream position, so only changes made after 
the job starts are captured. "
+                                    + "\"timestamp\": skips the snapshot and 
starts reading the change stream from the position given by 
\"startup.timestamp\".");
 
     public static final SingleChoiceOption<StopMode> STOP_MODE =
             Options.key(SourceOptions.STOP_MODE_KEY)
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
index cbb835129a..e7a69c5e0b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceConfigProvider.java
@@ -111,6 +111,7 @@ public class MongodbSourceConfigProvider {
         public Builder startupOptions(StartupConfig startupOptions) {
             this.startupOptions = Objects.requireNonNull(startupOptions);
             if (startupOptions.getStartupMode() != StartupMode.INITIAL
+                    && startupOptions.getStartupMode() != StartupMode.LATEST
                     && startupOptions.getStartupMode() != 
StartupMode.TIMESTAMP) {
                 throw new MongodbConnectorException(
                         ILLEGAL_ARGUMENT,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/source/MongodbIncrementalSourceFactoryTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/source/MongodbIncrementalSourceFactoryTest.java
index 8720c7556c..7fe549f80a 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/source/MongodbIncrementalSourceFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/source/MongodbIncrementalSourceFactoryTest.java
@@ -18,9 +18,16 @@
 package mongodb.source;
 
 import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
+import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.MongodbIncrementalSourceFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceConfigProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffsetFactory;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -34,7 +41,7 @@ public class MongodbIncrementalSourceFactoryTest {
     }
 
     @Test
-    public void testWithUnsupportedStartUpMode() {
+    public void testSupportedStartUpModes() {
         MongodbIncrementalSourceFactory mongodbIncrementalSourceFactory =
                 new MongodbIncrementalSourceFactory();
         
mongodbIncrementalSourceFactory.optionRule().getOptionalOptions().stream()
@@ -42,8 +49,48 @@ public class MongodbIncrementalSourceFactoryTest {
                 .forEach(
                         (option) -> {
                             Assertions.assertIterableEquals(
-                                    Arrays.asList(StartupMode.INITIAL, 
StartupMode.TIMESTAMP),
+                                    Arrays.asList(
+                                            StartupMode.INITIAL,
+                                            StartupMode.LATEST,
+                                            StartupMode.TIMESTAMP),
                                     ((SingleChoiceOption<StartupMode>) 
option).getOptionValues());
                         });
     }
+
+    @Test
+    public void testSourceConfigBuilderAcceptsLatestStartupMode() {
+        // Regression for the real source-assembly path: the builder used to 
reject
+        // StartupMode.LATEST at runtime even though the option rule 
advertised it.
+        MongodbSourceConfig config =
+                MongodbSourceConfigProvider.newBuilder()
+                        .hosts("localhost:27017")
+                        .startupOptions(new StartupConfig(StartupMode.LATEST, 
null, null, null))
+                        .validate()
+                        .create(0);
+
+        Assertions.assertEquals(StartupMode.LATEST, 
config.getStartupConfig().getStartupMode());
+    }
+
+    @Test
+    public void testSourceConfigBuilderRejectsUnsupportedStartupMode() {
+        Assertions.assertThrows(
+                MongodbConnectorException.class,
+                () ->
+                        MongodbSourceConfigProvider.newBuilder()
+                                .startupOptions(
+                                        new 
StartupConfig(StartupMode.EARLIEST, null, null, null)));
+    }
+
+    @Test
+    public void testLatestStartupModeResolvesToLatestChangeStreamOffset() {
+        StartupConfig startupConfig = new StartupConfig(StartupMode.LATEST, 
null, null, null);
+
+        Offset startupOffset = startupConfig.getStartupOffset(new 
ChangeStreamOffsetFactory());
+
+        Assertions.assertInstanceOf(ChangeStreamOffset.class, startupOffset);
+        // The latest offset is a current-time change-stream position, not a 
resume token from a
+        // snapshot: starting here means only changes made after the job 
starts are consumed.
+        Assertions.assertNotNull(((ChangeStreamOffset) 
startupOffset).getTimestamp());
+        Assertions.assertNull(((ChangeStreamOffset) 
startupOffset).getResumeToken());
+    }
 }

Reply via email to