SonKi-Aurora opened a new issue, #7520: URL: https://github.com/apache/seatunnel/issues/7520
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened 当我试图使用mongo-cdc同步mongo 3.6数据到doris 2.1的过程中,当一段时间mongo数据没有数据变化,之后再产生数据变化会出现下列异常 ### SeaTunnel Version 2.3.7 ### SeaTunnel Config ```conf env { # You can set engine configuration here parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 1000 } source { MongoDB-CDC { hosts = "xx.xx.xx.xx:xx" database = ["xx"] collection = ["xx.xx"] username = xx password = xx heartbeat.interval.ms = 5000 schema = { fields { "_id" : String, "source" : String, "outSideSource" : String, "type" : String, "contentType" : String, "name" : String, "searchName" : String, "originName" : String, "aliasName" : String, "timeLen" : String, "zone" : String, "language" : String, "years" : String, "premiereTime" : String, "worldPremiere" : String, "mainlandPremiere" : String, "youkuPremiere" : String, "director" : String, "scriptWriter" : String, "producer" : String, "superviser" : String, "leadingRole" : String, "presenter" : String, "associateProducer" : String, "otherCrew" : String, "openingTheme" : String, "endingTheme" : String, "episode" : String, "roleMapping" : String, "tags" : String, "poster" : String, "updateCycle" : String, "score" : String, "doubanScore" : String, "doubanDetailUrl" : String, "playCounts" : String, "hot" : String, "awards" : String, "tvStation" : String, "cpCode" : String, "cpName" : String, "shortDesc" : String, "detailsUrl" : String, "playUrl" : String, "imdbUrl" : String, "setCounts" : String, "desc" : String, "date" : String, "creatTime" : String, "lastModify" : String, "isUpdate" : String, "middlePosterAddr" : String, "smallPosterAddr" : String, "squarePosterAddr" : String, "horizontalPoster" : String, "isRelatedBase" : Int, "relationMode" : String, "isCanMakeBase" : Int, "checkId" : String, "dataProvider" : String, "province" : String, "provinceEpgIds" : String, "isVip" : String, "isChecked" : String, "jiShu" : String, "outSourceId" : String, "isCharge" : String, "partnerCode" : String, "danMuFlag" : Int, "bulletCreateDate" : String, "commentFlag" : Int, "commentCreateDate" : String, "elementType" : String, "hasHighLight" : String, "highLightCreateDate" : String, "hasProgram" : String, "programCreateDate" : String, "hasStartEndPoint" : String, "startEndPointCreateDate" : String, "yinHeType" : String, "statusCode" : String, "publisher" : String, "updateStatus" : String, "isAlone" : String, "finishSpiderProgram" : String, "hasShotdetRecord" : String } } } } sink { Doris { fenodes = "xx.xx.xx.xx:xx" username = xx password = "xx" table.identifier = "xx.xx" sink.label-prefix = "xx_" sink.enable-2pc = "true" sink.enable-delete = "true" doris.config { format = "json" read_json_by_line = "true" } } } ``` ### Running Command ```shell ./bin/seatunnel.sh --config ./config/xx.config --async -n xx ``` ### Error Exception ```log 2024-08-29 11:10:10,595 WARN [c.m.s.f.MongodbStreamFetchTask] [debezium-reader-0] - Cannot extract clusterTime from change stream event, fallback to current timestamp. 2024-08-29 11:10:11,166 INFO [o.a.s.c.d.s.w.RecordBuffer ] [st-multi-table-sink-writer-2] - start buffer data, read queue size 0, write queue size 3 2024-08-29 11:10:11,166 WARN [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] Exception in org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@2775d124 java.lang.NumberFormatException: null at java.lang.Long.parseLong(Long.java:552) ~[?:1.8.0_381] at java.lang.Long.parseLong(Long.java:631) ~[?:1.8.0_381] at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset.getTimestamp(ChangeStreamOffset.java:70) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset.compareTo(ChangeStreamOffset.java:92) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset.compareTo(ChangeStreamOffset.java:37) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.offset.Offset.isAtOrAfter(Offset.java:71) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState.markEnterPureIncrementPhaseIfNeed(IncrementalSplitState.java:82) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.markEnterPureIncrementPhase(IncrementalSourceRecordEmitter.java:164) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:102) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:61) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[?:?] at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156) ~[seatunnel-starter.jar:2.3.7] at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127) ~[seatunnel-starter.jar:2.3.7] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.7] at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132) ~[seatunnel-starter.jar:2.3.7] at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:717) ~[seatunnel-starter.jar:2.3.7] at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1039) ~[seatunnel-starter.jar:2.3.7] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_381] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_381] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_381] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_381] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381] 2024-08-29 11:10:11,166 INFO [o.a.s.c.d.s.w.DorisStreamLoad ] [st-multi-table-sink-writer-2] - stream load started for sourcePS__vgs_sourcePS_881361616757587971_0_2619 2024-08-29 11:10:11,166 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000} 2024-08-29 11:10:11,166 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] task 40000 error with exception: [java.lang.NumberFormatException: null], cancel other task in taskGroup TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}. 2024-08-29 11:10:11,166 INFO [o.a.s.c.d.s.w.DorisStreamLoad ] [stream-load-upload] - start execute load 2024-08-29 11:10:11,166 WARN [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] Interrupted task 50000 - org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@32b029ac 2024-08-29 11:10:11,166 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskDone, taskId = 50000, taskGroup = TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000} 2024-08-29 11:10:11,167 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - Release classloader for job 881361616757587971 with jars [file:/data/seatunnel/apache-seatunnel/connectors/connector-cdc-mongodb-2.3.7.jar, file:/data/seatunnel/apache-seatunnel/connectors/connector-doris-2.3.7.jar] 2024-08-29 11:10:11,167 INFO [a.s.c.s.c.s.r.SourceReaderBase] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - Closing Source Reader 0. 2024-08-29 11:10:11,170 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - Shutting down split fetcher 0 2024-08-29 11:10:11,180 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread st-multi-table-sink-writer-2 2024-08-29 11:10:11,180 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread hz.main.seaTunnel.task.thread-71 2024-08-29 11:10:11,180 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread ForkJoinPool.commonPool-worker-0 2024-08-29 11:10:11,180 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread stream-load-upload 2024-08-29 11:10:11,180 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread st-multi-table-sink-writer-1 2024-08-29 11:10:11,180 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread stream-load-check 2024-08-29 11:10:11,180 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - recycle classloader for thread BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000} 2024-08-29 11:10:11,185 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskGroup TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000} complete with FAILED 2024-08-29 11:10:11,185 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] task 50000 error with exception: [java.lang.NumberFormatException: null], cancel other task in taskGroup TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}. 2024-08-29 11:10:11,185 INFO [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-71] - [10.103.22.10]:5802 [seatunnel] [5.1] Task TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000} complete with state FAILED 2024-08-29 11:10:11,187 INFO [o.a.h.i.e.RetryExec ] [stream-load-upload] - I/O exception (java.net.SocketException) caught when processing request to {}->http://10.103.22.20:8040: Socket is closed 2024-08-29 11:10:11,187 INFO [o.a.h.i.e.RetryExec ] [stream-load-upload] - Retrying request to {}->http://10.103.22.20:8040 2024-08-29 11:10:11,189 INFO [a.s.e.s.s.s.DefaultSlotService] [hz.main.generic-operation.thread-45] - received slot release request, jobID: 881361616757587971, slot: SlotProfile{worker=[10.103.22.10]:5802, slotID=37, ownerJobID=881361616757587971, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='12893983-3196-4808-a2ed-22de231e4ba6'} 2024-08-29 11:10:11,191 INFO [o.a.s.e.s.TaskExecutionService] [hz.main.generic-operation.thread-5] - [10.103.22.10]:5802 [seatunnel] [5.1] Task (TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}) need cancel. 2024-08-29 11:10:11,192 WARN [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - [10.103.22.10]:5802 [seatunnel] [5.1] Interrupted task 20000 - org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask@228a8a17 2024-08-29 11:10:11,192 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskDone, taskId = 20000, taskGroup = TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1} 2024-08-29 11:10:11,192 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - Release classloader for job 881361616757587971 with jars [file:/data/seatunnel/apache-seatunnel/connectors/connector-cdc-mongodb-2.3.7.jar] 2024-08-29 11:10:11,197 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - recycle classloader for thread MaintenanceTimer-2-thread-1 2024-08-29 11:10:11,197 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - recycle classloader for thread BufferPoolPruner-1-thread-1 2024-08-29 11:10:11,197 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - recycle classloader for thread cluster-ClusterId{value='66cfd9f81461db4b6c7a5a33', description='null'}-10.103.12.39:27017 2024-08-29 11:10:11,197 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - recycle classloader for thread BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1} 2024-08-29 11:10:11,197 INFO [.c.c.DefaultClassLoaderService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - recycle classloader for thread cluster-rtt-ClusterId{value='66cfd9f81461db4b6c7a5a33', description='null'}-10.103.12.39:27017 2024-08-29 11:10:11,201 INFO [o.a.s.e.s.TaskExecutionService] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskGroup TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1} complete with CANCELED 2024-08-29 11:10:11,201 INFO [.e.IncrementalSourceEnumerator] [BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}] - Closing enumerator... 2024-08-29 11:10:11,201 INFO [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-71] - [10.103.22.10]:5802 [seatunnel] [5.1] Task TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1} complete with state CANCELED 2024-08-29 11:10:13,141 INFO [r.IncrementalSourceSplitReader] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - Close current fetcher org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher 2024-08-29 11:10:13,596 INFO [c.m.k.c.s.h.HeartbeatManager ] [debezium-reader-0] - Generating heartbeat event. {"_data": {"$binary": {"base64": "gmbP5pIAAAABRjxfaWQAPDRkMDA4ODRjMmYyYTI0MWJkNzAwY2ViODZUVlNPVQAAWhAEI6PeJyh4Q4WSkVMWgbWnSgQ=", "subType": "00"}}} 2024-08-29 11:10:13,597 INFO [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data Fetcher for BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited. 2024-08-29 11:10:14,207 INFO [a.s.e.s.s.s.DefaultSlotService] [hz.main.generic-operation.thread-19] - received slot release request, jobID: 881361616757587971, slot: SlotProfile{worker=[10.103.22.10]:5802, slotID=36, ownerJobID=881361616757587971, assigned=true, resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, sequence='12893983-3196-4808-a2ed-22de231e4ba6'} ``` ### Zeta or Flink or Spark Version Zeta 2.3.7 ### Java or Scala Version Java 8 ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
