hanksong opened a new issue, #9511:
URL: https://github.com/apache/seatunnel/issues/9511

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   ### What happened
   
   集群模式下,当使用mongo-cdc连接器向console注入数据时,启动job后会将启动前的increment写入一次,然后就卡在pure 
increment phase中。
   
   ### SeaTunnel Version
   
   2.3.11
   
   ### SeaTunnel Config
   
   ```conf
   source {
     MongoDB-CDC {
       hosts = "mongo:27017"
       database = ["testdb"]
       collection = ["testdb.position_records"]
       username = "stuser"
       password = "stpw"
       schema = {
         table = "testdb.position_records"
         fields {
           _id         = string
           account_id  = string
           api         = string
           gen_time    = long
           request_id  = int
           result      = array<string>
           # result      = string
           result_type = int
           src_host_name = string
         }
       }
       plugin_output = "mongo_source"
       connection.options = "authSource=admin&directConnection=true"
       heartbeat.interval.ms = 5000
     }
   }
   ```
   
   ### Running Command
   
   ```shell
   bin/seatunnel.sh --config /opt/seatunnel/config/seatunnel_mongo.conf 
--engine zeta
   ```
   
   ### Error Exception
   
   ```log
   2025-06-27 09:14:41,505 INFO  [.b.s.r.IncrementalSourceReader] 
[hz.main.generic-operation.thread-0] - The incremental 
split[incremental-split-0] startup position {resumeToken={"_data": 
"82685E60F9000000032B0429296E1404"}, timestamp=7520555050318430211} is equal 
the maxSnapshotSplitsHighWatermark {resumeToken={"_data": 
"82685E60F9000000032B0429296E1404"}, timestamp=7520555050318430211}, auto enter 
pure increment phase.
   2025-06-27 09:14:41,505 INFO  [.b.s.r.IncrementalSourceReader] 
[hz.main.generic-operation.thread-0] - Clean the 
IncrementalSplit#completedSnapshotSplitInfos to empty.
   2025-06-27 09:14:48,327 INFO  [.s.e.s.c.CheckpointCoordinator] 
[seatunnel-coordinator-service-9] - wait checkpoint completed: 1
   ```
   
   ```js
   function sleep(ms) {
     return new Promise(resolve => setTimeout(resolve, ms));
   }
   
   async function main() {
     const dbName = 'testdb';
     const userName = 'stuser';
     const password = 'stpw';
   
     print("Waiting for primary...");
     // Wait until this node is the primary
     while (!db.isMaster().ismaster) {
       sleep(1000);
     }
     print("Primary is ready.");
   
     const testDb = db.getSiblingDB(dbName);
     const adminDb = db.getSiblingDB('admin');
   
     // Ensure role exists in testdb
     const roleExists = testDb.getRole('strole', { showPrivileges: false });
     if (!roleExists) {
         print("Creating role 'strole' in testdb...");
         testDb.createRole({
             role: "strole",
             privileges: [{
                 resource: { db: dbName, collection: "" },
                 actions: ["listDatabases", "listCollections", "splitVector", 
"collStats", "find", "changeStream"]
             }],
             roles: []
         });
         print("Role 'strole' created.");
     } else {
         print("Role 'strole' already exists in testdb.");
     }
     
     // Create user in admin db
     const userExists = adminDb.getUsers({
         usersInfo: { user: userName, db: 'admin' }
     }).length > 0;
   
     if (userExists) {
         print("User 'stuser' already exists in admin db.");
     } else {
         print("Creating user 'stuser' in admin db...");
         adminDb.createUser({
             user: userName,
             pwd: password,
             roles: [
                 { role: 'strole', db: dbName },
                 { role: 'read', db: 'config' },
                 { role: 'clusterManager', db: 'admin' }
             ]
         });
         print("User 'stuser' created successfully in admin db.");
     }
   }
   
   main();
   ```
   
   ### Zeta or Flink or Spark Version
   
   Zeta.
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   
![Image](https://github.com/user-attachments/assets/62abdd9a-1611-466b-b25a-3db30835fb80)
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to