jw-itq opened a new issue, #7661: URL: https://github.com/apache/seatunnel/issues/7661
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened When using mongo-cdc for synchronization, the historical phase works fine, but the incremental phase fails after running for a short period of time. Even after failure, attempts to recover using the -r flag also result in failure. Please help investigate. ### SeaTunnel Version dev ### SeaTunnel Config ```conf env { parallelism = 4 job.mode = "STREAMING" checkpoint.interval = 1000 checkpoint.timeout=300000 } source { MongoDB-CDC { hosts = "192.168.0.2:27017,192.168.0.3:27017,192.168.0.4:27017/test?connectTimeoutMS=600000" database = ["test"] collection = ["test.mongo_test"] username = "test" password = "test@123" schema = { fields { "_id" : STRING, "_class" : STRING, "creationTime" : TIMESTAMP, "modificationTime" : TIMESTAMP, "year_" : INT, "month_" : INT, "day_of_month" : INT, "quarter_" : INT, "week_" : INT, "day_of_week" : INT, "day_time" : BIGINT, "hour_of_day" : INT, "test1" : INT, "test2" : INT, "test3" : STRING, "test4" : INT, "test5" : STRING, "test6" : INT, "test7" : STRING, "code" : STRING, "tset8" : STRING, "test9" : STRING, "ip" : STRING, "test10" : INT } } startup.mode = timestamp startup.timestamp = "1725865996000" result_table_name = "source_mongo_test" } } transform { Sql { source_table_name = "source_mongo_test" result_table_name = "result_mongo_test" query = "select *, Ip2regionUDF(ip) as ipr from source_mongo_test where creationTime is not null" } } sink { StarRocks { source_table_name = "result_mongo_test" nodeUrls = ["127.0.0.1:8030"] username = "root" password = "123456" database = "ltest" table = "mongo_test" base-url = "jdbc:mysql://127.0.0.1:9030/ltest" max_retries = 3 enable_upsert_delete = true } } ``` ### Running Command ```shell seatunnel.sh -c mongo_test.conf --async ``` ### Error Exception ```log 2024-09-13 19:36:48,451 ERROR [.s.c.s.r.f.SplitFetcherManager] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - Received uncaught exception. java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81) ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_412] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_412] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_412] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_412] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_412] Caused by: org.apache.kafka.connect.errors.DataException: source is not a valid field name at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) ~[?:?] at org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) ~[?:?] -- at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.shouldEmit(IncrementalSourceStreamFetcher.java:215) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.splitNormalStream(IncrementalSourceStreamFetcher.java:146) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.pollSplitRecords(IncrementalSourceStreamFetcher.java:135) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:75) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54) ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7] ... 6 more 2024-09-13 19:36:48,451 INFO [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher 2024-09-13 19:36:48,549 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited. 2024-09-13 19:36:48,598 WARN [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - [172.18.3.56]:5802 [aliyun-seatunnel-main] [5.1] Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@50fd5feb java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147) ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167) ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93) ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[?:?] at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159) ~[seatunnel-starter.jar:2.3.8-SNAPSHOT] at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127) ~[seatunnel-starter.jar:2.3.8-SNAPSHOT] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.8-SNAPSHOT] at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132) ~[seatunnel-starter.jar:2.3.8-SNAPSHOT] at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:681) ~[seatunnel-starter.jar:2.3.8-SNAPSHOT] at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1003) ~[seatunnel-starter.jar:2.3.8-SNAPSHOT] at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39) ~[seatunnel-starter.jar:2.3.8-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_412] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_412] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_412] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_412] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_412] Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81) ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7] ... 5 more Caused by: org.apache.kafka.connect.errors.DataException: source is not a valid field name at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) ~[?:?] at org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) ~[?:?] at org.apache.kafka.connect.data.Struct.getStruct(Struct.java:191) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getTableId(SourceRecordUtils.java:118) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.shouldEmit(IncrementalSourceStreamFetcher.java:215) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.splitNormalStream(IncrementalSourceStreamFetcher.java:146) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.pollSplitRecords(IncrementalSourceStreamFetcher.java:135) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:75) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54) ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81) ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7] ... 5 more 2024-09-13 19:36:48,598 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - [172.18.3.56]:5802 [aliyun-seatunnel-main] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000} 2024-09-13 19:36:48,598 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - [172.18.3.56]:5802 [aliyun-seatunnel-main] [5.1] task 40000 error with exception: [java.lang.RuntimeException: One or more fetchers have encountered exception], cancel other task in taskGroup TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}. 2024-09-13 19:36:48,598 WARN [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - [172.18.3.56]:5802 [aliyun-seatunnel-main] [5.1] Interrupted task 50000 - org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@142e1eb9 2024-09-13 19:36:48,598 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - [172.18.3.56]:5802 [aliyun-seatunnel-main] [5.1] taskDone, taskId = 50000, taskGroup = TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000} 2024-09-13 19:36:48,598 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - Release classloader for job 886872658728189957 with jars [file:/data/seatunnel/apache-seatunnel-2.3.8-SNAPSHOT/connectors/connector-cdc-mongodb-2.3.8-SNAPSHOT.jar, file:/data/seatunnel/apache-seatunnel-2.3.8-SNAPSHOT/connectors/seatunnel-transforms-v2-2.3.8-SNAPSHOT.jar, file:/data/seatunnel/apache-seatunnel-2.3.8-SNAPSHOT/connectors/connector-starrocks-2.3.8-SNAPSHOT.jar] 2024-09-13 19:36:48,604 INFO [a.s.c.s.c.s.r.SourceReaderBase] [BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - Closing Source Reader 0. 2024-09-13 19:36:48,607 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread ForkJoinPool.commonPool-worker-1 2024-09-13 19:36:48,607 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000} 2024-09-13 19:36:48,607 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread hz.main.seaTunnel.task.thread-806 2024-09-13 19:36:48,702 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}] - [172.18.3.56]:5802 [aliyun-seatunnel-main] [5.1] taskGroup TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000} complete with FAILED ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version openjdk version "1.8.0_352" ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
