yjsdev opened a new issue, #11008:
URL: https://github.com/apache/seatunnel/issues/11008

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   2.3.12版本、2.3.13版本均出现Mongo-CDC同步时,Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.kafka.connect.errors.DataException: operationType is not a valid 
field name
   
   例如test-db to test-db2,表t_sys_user,
   数据示例:
   db.t_sys_user.insertOne({
     "_id":"12",
     phone: "47873454385",
     name: "直接写入测试8"
   })
   env {
     job.mode = "STREAMING"
     checkpoint.interval = 10000
     parallelism = 1
   }
   
   
   ### SeaTunnel Version
   
   2.3.12版本、2.3.13版本
   
   ### SeaTunnel Config
   
   ```conf
   conf示例:
   source {
     MongoDB-CDC {
       # ✅ 建议:在hosts和connection.options中都进行配置,以确保正确连接到副本集
       hosts = "127.0.0.1:27017"
       # 在URI中指定副本集名称和读偏好,强制从Primary读取,确保能获取完整的Change Stream数据
       connection.options = "replicaSet=rs0&readPreference=primary"
   
       database = ["test-db"]
       collection = ["test-db.t_sys_user"]
   
       # ✅ 关键优化:设置一个大于0的心跳间隔,防止resume token过期
       # 官方建议当集合变更慢时强烈建议设置此参数 [citation:3]
       heartbeat.interval.ms = 10000
       schema = {
         table = "test-db.t_sys_user"
         fields {
           "_id" : string
           "phone" : string,
           "name" : string
         }
       }
     }
   }
   
   sink {
     MongoDB {
       uri = "mongodb://127.0.0.1:27017"
       database = "test-db2"
       collection = "t_sys_user"
       # 🔑 关键修改:使用 distributed_id 作为主键
       primary-key = ["_id"]
       # 兼容旧版本的写法
       #upsert-key = ["_id"]
       upsert-enable = true
       buffer-flush.max-rows = 1000
       buffer-flush.interval = 10000
     }
   }
   ```
   
   ### Running Command
   
   ```shell
   String configurePath = args.length > 0 ? args[0] : 
"/examples/mongoA_to_mongoB.conf";
   ```
   
   ### Error Exception
   
   ```log
   ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception 
StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: 
SeaTunnel job executed failed
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.example.engine.SeaTunnelEngineLocalExample.main(SeaTunnelEngineLocalExample.java:49)
   Caused by: 
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: 
org.apache.kafka.connect.errors.DataException: operationType is not a valid 
field name
        at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
        at org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261)
        at org.apache.kafka.connect.data.Struct.getString(Struct.java:158)
        at 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema.operationTypeFor(MongoDBConnectorDeserializationSchema.java:183)
        at 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema.deserialize(MongoDBConnectorDeserializationSchema.java:107)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:201)
        at 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.MongoDBRecordEmitter.processElement(MongoDBRecordEmitter.java:79)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:102)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:62)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
        at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114)
        at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
        at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
        at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
        at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:679)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1008)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:842)
   
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
        ... 2 more
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to