jw-itq opened a new issue, #7661:
URL: https://github.com/apache/seatunnel/issues/7661

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   When using mongo-cdc for synchronization, the historical phase works fine, 
but the incremental phase fails after running for a short period of time. Even 
after failure, attempts to recover using the -r flag also result in failure. 
Please help investigate.
   
   
   ### SeaTunnel Version
   
   dev
   
   ### SeaTunnel Config
   
   ```conf
   env {
     parallelism = 4
     job.mode = "STREAMING"
     checkpoint.interval = 1000
     checkpoint.timeout=300000
   }
   
   source {
     MongoDB-CDC {
       hosts = 
"192.168.0.2:27017,192.168.0.3:27017,192.168.0.4:27017/test?connectTimeoutMS=600000"
       database = ["test"]
       collection = ["test.mongo_test"]
       username = "test"
       password = "test@123"
       schema = {
         fields {
           "_id" : STRING,
           "_class" : STRING,
           "creationTime" : TIMESTAMP,
           "modificationTime" : TIMESTAMP,
           "year_" : INT,
           "month_" : INT,
           "day_of_month" : INT,
           "quarter_" : INT,
           "week_" : INT,
           "day_of_week" : INT,
           "day_time" : BIGINT,
           "hour_of_day" : INT,
           "test1" : INT,
           "test2" : INT,
           "test3" : STRING,
           "test4" : INT,
           "test5" : STRING,
           "test6" : INT,
           "test7" : STRING,
           "code" : STRING,
           "tset8" : STRING,
           "test9" : STRING,
           "ip" : STRING,
           "test10" : INT
         }
       }
      startup.mode = timestamp
      startup.timestamp = "1725865996000"
      result_table_name = "source_mongo_test"
     }
   }
   
   transform {
     Sql {
       source_table_name = "source_mongo_test"
       result_table_name = "result_mongo_test"
       query = "select *, Ip2regionUDF(ip) as ipr from source_mongo_test where 
creationTime is not null"
     }
   }
   
   sink {
     StarRocks {
       source_table_name = "result_mongo_test"
       nodeUrls = ["127.0.0.1:8030"]
       username = "root"
       password = "123456"
       database = "ltest"
       table = "mongo_test"
       base-url = "jdbc:mysql://127.0.0.1:9030/ltest"
       max_retries = 3
       enable_upsert_delete = true
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   seatunnel.sh -c mongo_test.conf --async
   ```
   
   
   ### Error Exception
   
   ```log
   2024-09-13 19:36:48,451 ERROR [.s.c.s.r.f.SplitFetcherManager] [Source Data 
Fetcher for BlockingWorker-TaskGroupLocation{jobId=886872658728189957, 
pipelineId=1, taskGroupId=30000}] - Received uncaught exception.
   java.lang.RuntimeException: SplitFetcher thread 0 received unexpected 
exception while polling the records
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
 ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7]
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
 ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7]
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_412]
           at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_412]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_412]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_412]
           at java.lang.Thread.run(Thread.java:750) [?:1.8.0_412]
   Caused by: org.apache.kafka.connect.errors.DataException: source is not a 
valid field name
           at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) 
~[?:?]
           at 
org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) ~[?:?]
   --
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.shouldEmit(IncrementalSourceStreamFetcher.java:215)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.splitNormalStream(IncrementalSourceStreamFetcher.java:146)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.pollSplitRecords(IncrementalSourceStreamFetcher.java:135)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:75)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54)
 ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7]
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
 ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7]
           ... 6 more
   2024-09-13 19:36:48,451 INFO  [r.IncrementalSourceSplitReader] [Source Data 
Fetcher for BlockingWorker-TaskGroupLocation{jobId=886872658728189957, 
pipelineId=1, taskGroupId=30000}] - Close current fetcher 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher
   2024-09-13 19:36:48,549 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data 
Fetcher for BlockingWorker-TaskGroupLocation{jobId=886872658728189957, 
pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
   2024-09-13 19:36:48,598 WARN  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, 
taskGroupId=30000}] - [172.18.3.56]:5802 [aliyun-seatunnel-main] [5.1] 
Exception in 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@50fd5feb
   java.lang.RuntimeException: One or more fetchers have encountered exception
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
 ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7]
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
 ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7]
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
 ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7]
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
 ~[?:?]
           at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
 ~[seatunnel-starter.jar:2.3.8-SNAPSHOT]
           at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
 ~[seatunnel-starter.jar:2.3.8-SNAPSHOT]
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
 ~[seatunnel-starter.jar:2.3.8-SNAPSHOT]
           at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
 ~[seatunnel-starter.jar:2.3.8-SNAPSHOT]
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:681)
 ~[seatunnel-starter.jar:2.3.8-SNAPSHOT]
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1003)
 ~[seatunnel-starter.jar:2.3.8-SNAPSHOT]
           at 
org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:39) 
~[seatunnel-starter.jar:2.3.8-SNAPSHOT]
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_412]
           at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_412]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_412]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_412]
           at java.lang.Thread.run(Thread.java:750) [?:1.8.0_412]
   Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
 ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7]
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
 ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7]
           ... 5 more
   Caused by: org.apache.kafka.connect.errors.DataException: source is not a 
valid field name
           at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) 
~[?:?]
           at 
org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) ~[?:?]
           at org.apache.kafka.connect.data.Struct.getStruct(Struct.java:191) 
~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils.getTableId(SourceRecordUtils.java:118)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.shouldEmit(IncrementalSourceStreamFetcher.java:215)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.splitNormalStream(IncrementalSourceStreamFetcher.java:146)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.pollSplitRecords(IncrementalSourceStreamFetcher.java:135)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:75)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54)
 ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7]
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
 ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7]
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
 ~[seatunnel-transforms-v2-2.3.7.jar:2.3.7]
           ... 5 more
   2024-09-13 19:36:48,598 INFO  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, 
taskGroupId=30000}] - [172.18.3.56]:5802 [aliyun-seatunnel-main] [5.1] 
taskDone, taskId = 40000, taskGroup = 
TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}
   2024-09-13 19:36:48,598 INFO  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, 
taskGroupId=30000}] - [172.18.3.56]:5802 [aliyun-seatunnel-main] [5.1] task 
40000 error with exception: [java.lang.RuntimeException: One or more fetchers 
have encountered exception], cancel other task in taskGroup 
TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}.
   2024-09-13 19:36:48,598 WARN  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, 
taskGroupId=30000}] - [172.18.3.56]:5802 [aliyun-seatunnel-main] [5.1] 
Interrupted task 50000 - 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@142e1eb9
   2024-09-13 19:36:48,598 INFO  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, 
taskGroupId=30000}] - [172.18.3.56]:5802 [aliyun-seatunnel-main] [5.1] 
taskDone, taskId = 50000, taskGroup = 
TaskGroupLocation{jobId=886872658728189957, pipelineId=1, taskGroupId=30000}
   2024-09-13 19:36:48,598 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, 
taskGroupId=30000}] - Release classloader for job 886872658728189957 with jars 
[file:/data/seatunnel/apache-seatunnel-2.3.8-SNAPSHOT/connectors/connector-cdc-mongodb-2.3.8-SNAPSHOT.jar,
 
file:/data/seatunnel/apache-seatunnel-2.3.8-SNAPSHOT/connectors/seatunnel-transforms-v2-2.3.8-SNAPSHOT.jar,
 
file:/data/seatunnel/apache-seatunnel-2.3.8-SNAPSHOT/connectors/connector-starrocks-2.3.8-SNAPSHOT.jar]
   2024-09-13 19:36:48,604 INFO  [a.s.c.s.c.s.r.SourceReaderBase] 
[BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, 
taskGroupId=30000}] - Closing Source Reader 0.
   2024-09-13 19:36:48,607 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, 
taskGroupId=30000}] - recycle classloader for thread 
ForkJoinPool.commonPool-worker-1
   2024-09-13 19:36:48,607 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, 
taskGroupId=30000}] - recycle classloader for thread 
BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, 
taskGroupId=30000}
   2024-09-13 19:36:48,607 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, 
taskGroupId=30000}] - recycle classloader for thread 
hz.main.seaTunnel.task.thread-806
   2024-09-13 19:36:48,702 INFO  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=886872658728189957, pipelineId=1, 
taskGroupId=30000}] - [172.18.3.56]:5802 [aliyun-seatunnel-main] [5.1] 
taskGroup TaskGroupLocation{jobId=886872658728189957, pipelineId=1, 
taskGroupId=30000} complete with FAILED
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   openjdk version "1.8.0_352"
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to