yjsdev opened a new issue, #11008: URL: https://github.com/apache/seatunnel/issues/11008
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened 2.3.12版本、2.3.13版本均出现Mongo-CDC同步时,Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.connect.errors.DataException: operationType is not a valid field name 例如test-db to test-db2,表t_sys_user, 数据示例: db.t_sys_user.insertOne({ "_id":"12", phone: "47873454385", name: "直接写入测试8" }) env { job.mode = "STREAMING" checkpoint.interval = 10000 parallelism = 1 } ### SeaTunnel Version 2.3.12版本、2.3.13版本 ### SeaTunnel Config ```conf conf示例: source { MongoDB-CDC { # ✅ 建议:在hosts和connection.options中都进行配置,以确保正确连接到副本集 hosts = "127.0.0.1:27017" # 在URI中指定副本集名称和读偏好,强制从Primary读取,确保能获取完整的Change Stream数据 connection.options = "replicaSet=rs0&readPreference=primary" database = ["test-db"] collection = ["test-db.t_sys_user"] # ✅ 关键优化:设置一个大于0的心跳间隔,防止resume token过期 # 官方建议当集合变更慢时强烈建议设置此参数 [citation:3] heartbeat.interval.ms = 10000 schema = { table = "test-db.t_sys_user" fields { "_id" : string "phone" : string, "name" : string } } } } sink { MongoDB { uri = "mongodb://127.0.0.1:27017" database = "test-db2" collection = "t_sys_user" # 🔑 关键修改:使用 distributed_id 作为主键 primary-key = ["_id"] # 兼容旧版本的写法 #upsert-key = ["_id"] upsert-enable = true buffer-flush.max-rows = 1000 buffer-flush.interval = 10000 } } ``` ### Running Command ```shell String configurePath = args.length > 0 ? args[0] : "/examples/mongoA_to_mongoB.conf"; ``` ### Error Exception ```log ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.example.engine.SeaTunnelEngineLocalExample.main(SeaTunnelEngineLocalExample.java:49) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.connect.errors.DataException: operationType is not a valid field name at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) at org.apache.kafka.connect.data.Struct.getString(Struct.java:158) at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema.operationTypeFor(MongoDBConnectorDeserializationSchema.java:183) at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema.deserialize(MongoDBConnectorDeserializationSchema.java:107) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:201) at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.MongoDBRecordEmitter.processElement(MongoDBRecordEmitter.java:79) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:102) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:62) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:679) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1008) at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:842) at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220) ... 2 more ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
