SonKi-Aurora opened a new issue, #7520:
URL: https://github.com/apache/seatunnel/issues/7520

   ### Search before asking
   
   - [X] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   当我试图使用mongo-cdc同步mongo 3.6数据到doris 
2.1的过程中,当一段时间mongo数据没有数据变化,之后再产生数据变化会出现下列异常
   
   ### SeaTunnel Version
   
   2.3.7
   
   ### SeaTunnel Config
   
   ```conf
   env {
     # You can set engine configuration here
     parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 1000
   }
   
   source {
     MongoDB-CDC {
       hosts = "xx.xx.xx.xx:xx"
       database = ["xx"]
       collection = ["xx.xx"]
       username = xx
       password = xx
       heartbeat.interval.ms = 5000
       schema = {
         fields {
           "_id" : String,
           "source" : String,
           "outSideSource" : String,
           "type" : String,
           "contentType" : String,
           "name" : String,
           "searchName" : String,
           "originName" : String,
           "aliasName" : String,
           "timeLen" : String,
           "zone" : String,
           "language" : String,
           "years" : String,
           "premiereTime" : String,
           "worldPremiere" : String,
           "mainlandPremiere" : String,
           "youkuPremiere" : String,
           "director" : String,
           "scriptWriter" : String,
           "producer" : String,
           "superviser" : String,
           "leadingRole" : String,
           "presenter" : String,
           "associateProducer" : String,
           "otherCrew" : String,
           "openingTheme" : String,
           "endingTheme" : String,
           "episode" : String,
           "roleMapping" : String,
           "tags" : String,
           "poster" : String,
           "updateCycle" : String,
           "score" : String,
           "doubanScore" : String,
           "doubanDetailUrl" : String,
           "playCounts" : String,
           "hot" : String,
           "awards" : String,
           "tvStation" : String,
           "cpCode" : String,
           "cpName" : String,
           "shortDesc" : String,
           "detailsUrl" : String,
           "playUrl" : String,
           "imdbUrl" : String,
           "setCounts" : String,
           "desc" : String,
           "date" : String,
           "creatTime" : String,
           "lastModify" : String,
           "isUpdate" : String,
           "middlePosterAddr" : String,
           "smallPosterAddr" : String,
           "squarePosterAddr" : String,
           "horizontalPoster" : String,
           "isRelatedBase" : Int,
           "relationMode" : String,
           "isCanMakeBase" : Int,
           "checkId" : String,
           "dataProvider" : String,
           "province" : String,
           "provinceEpgIds" : String,
           "isVip" : String,
           "isChecked" : String,
           "jiShu" : String,
           "outSourceId" : String,
           "isCharge" : String,
           "partnerCode" : String,
           "danMuFlag" : Int,
           "bulletCreateDate" : String,
           "commentFlag" : Int,
           "commentCreateDate" : String,
           "elementType" : String,
           "hasHighLight" : String,
           "highLightCreateDate" : String,
           "hasProgram" : String,
           "programCreateDate" : String,
           "hasStartEndPoint" : String,
           "startEndPointCreateDate" : String,
           "yinHeType" : String,
           "statusCode" : String,
           "publisher" : String,
           "updateStatus" : String,
           "isAlone" : String,
           "finishSpiderProgram" : String,
           "hasShotdetRecord" : String
         }
       }
     }
   }
   
   sink {
     Doris {
       fenodes = "xx.xx.xx.xx:xx"
       username = xx
       password = "xx"
       table.identifier = "xx.xx"
       sink.label-prefix = "xx_"
       sink.enable-2pc = "true"
       sink.enable-delete = "true"
       doris.config {
         format = "json"
         read_json_by_line = "true"
       }
     }
   }
   ```
   
   
   ### Running Command
   
   ```shell
   ./bin/seatunnel.sh --config ./config/xx.config --async -n xx
   ```
   
   
   ### Error Exception
   
   ```log
   2024-08-29 11:10:10,595 WARN  [c.m.s.f.MongodbStreamFetchTask] 
[debezium-reader-0] - Cannot extract clusterTime from change stream event, 
fallback to current timestamp.
   2024-08-29 11:10:11,166 INFO  [o.a.s.c.d.s.w.RecordBuffer    ] 
[st-multi-table-sink-writer-2] - start buffer data, read queue size 0, write 
queue size 3
   2024-08-29 11:10:11,166 WARN  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] Exception in 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask@2775d124
   java.lang.NumberFormatException: null
           at java.lang.Long.parseLong(Long.java:552) ~[?:1.8.0_381]
           at java.lang.Long.parseLong(Long.java:631) ~[?:1.8.0_381]
           at 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset.getTimestamp(ChangeStreamOffset.java:70)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset.compareTo(ChangeStreamOffset.java:92)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.offset.ChangeStreamOffset.compareTo(ChangeStreamOffset.java:37)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.source.offset.Offset.isAtOrAfter(Offset.java:71)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState.markEnterPureIncrementPhaseIfNeed(IncrementalSplitState.java:82)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.markEnterPureIncrementPhase(IncrementalSourceRecordEmitter.java:164)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:102)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:61)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110)
 ~[?:?]
           at 
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
 ~[?:?]
           at 
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:156)
 ~[seatunnel-starter.jar:2.3.7]
           at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
 ~[seatunnel-starter.jar:2.3.7]
           at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168)
 ~[seatunnel-starter.jar:2.3.7]
           at 
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
 ~[seatunnel-starter.jar:2.3.7]
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:717)
 ~[seatunnel-starter.jar:2.3.7]
           at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1039)
 ~[seatunnel-starter.jar:2.3.7]
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_381]
           at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_381]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_381]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_381]
           at java.lang.Thread.run(Thread.java:750) [?:1.8.0_381]
   2024-08-29 11:10:11,166 INFO  [o.a.s.c.d.s.w.DorisStreamLoad ] 
[st-multi-table-sink-writer-2] - stream load started for 
sourcePS__vgs_sourcePS_881361616757587971_0_2619
   2024-08-29 11:10:11,166 INFO  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskDone, taskId = 
40000, taskGroup = TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}
   2024-08-29 11:10:11,166 INFO  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] task 40000 error 
with exception: [java.lang.NumberFormatException: null], cancel other task in 
taskGroup TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}.
   2024-08-29 11:10:11,166 INFO  [o.a.s.c.d.s.w.DorisStreamLoad ] 
[stream-load-upload] - start execute load
   2024-08-29 11:10:11,166 WARN  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] Interrupted task 
50000 - org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@32b029ac
   2024-08-29 11:10:11,166 INFO  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskDone, taskId = 
50000, taskGroup = TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}
   2024-08-29 11:10:11,167 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - Release classloader for job 881361616757587971 with jars 
[file:/data/seatunnel/apache-seatunnel/connectors/connector-cdc-mongodb-2.3.7.jar,
 file:/data/seatunnel/apache-seatunnel/connectors/connector-doris-2.3.7.jar]
   2024-08-29 11:10:11,167 INFO  [a.s.c.s.c.s.r.SourceReaderBase] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - Closing Source Reader 0.
   2024-08-29 11:10:11,170 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - Shutting down split fetcher 0
   2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - recycle classloader for thread 
st-multi-table-sink-writer-2
   2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - recycle classloader for thread 
hz.main.seaTunnel.task.thread-71
   2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - recycle classloader for thread 
ForkJoinPool.commonPool-worker-0
   2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - recycle classloader for thread stream-load-upload
   2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - recycle classloader for thread 
st-multi-table-sink-writer-1
   2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - recycle classloader for thread stream-load-check
   2024-08-29 11:10:11,180 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - recycle classloader for thread 
BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}
   2024-08-29 11:10:11,185 INFO  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskGroup 
TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000} 
complete with FAILED
   2024-08-29 11:10:11,185 INFO  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}] - [10.103.22.10]:5802 [seatunnel] [5.1] task 50000 error 
with exception: [java.lang.NumberFormatException: null], cancel other task in 
taskGroup TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=30000}.
   2024-08-29 11:10:11,185 INFO  [o.a.s.e.s.TaskExecutionService] 
[hz.main.seaTunnel.task.thread-71] - [10.103.22.10]:5802 [seatunnel] [5.1] Task 
TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=30000} 
complete with state FAILED
   2024-08-29 11:10:11,187 INFO  [o.a.h.i.e.RetryExec           ] 
[stream-load-upload] - I/O exception (java.net.SocketException) caught when 
processing request to {}->http://10.103.22.20:8040: Socket is closed
   2024-08-29 11:10:11,187 INFO  [o.a.h.i.e.RetryExec           ] 
[stream-load-upload] - Retrying request to {}->http://10.103.22.20:8040
   2024-08-29 11:10:11,189 INFO  [a.s.e.s.s.s.DefaultSlotService] 
[hz.main.generic-operation.thread-45] - received slot release request, jobID: 
881361616757587971, slot: SlotProfile{worker=[10.103.22.10]:5802, slotID=37, 
ownerJobID=881361616757587971, assigned=true, 
resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, 
sequence='12893983-3196-4808-a2ed-22de231e4ba6'}
   2024-08-29 11:10:11,191 INFO  [o.a.s.e.s.TaskExecutionService] 
[hz.main.generic-operation.thread-5] - [10.103.22.10]:5802 [seatunnel] [5.1] 
Task (TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1}) 
need cancel.
   2024-08-29 11:10:11,192 WARN  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=1}] - [10.103.22.10]:5802 [seatunnel] [5.1] Interrupted task 20000 
- org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask@228a8a17
   2024-08-29 11:10:11,192 INFO  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=1}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskDone, taskId = 
20000, taskGroup = TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=1}
   2024-08-29 11:10:11,192 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=1}] - Release classloader for job 881361616757587971 with jars 
[file:/data/seatunnel/apache-seatunnel/connectors/connector-cdc-mongodb-2.3.7.jar]
   2024-08-29 11:10:11,197 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=1}] - recycle classloader for thread MaintenanceTimer-2-thread-1
   2024-08-29 11:10:11,197 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=1}] - recycle classloader for thread BufferPoolPruner-1-thread-1
   2024-08-29 11:10:11,197 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=1}] - recycle classloader for thread 
cluster-ClusterId{value='66cfd9f81461db4b6c7a5a33', 
description='null'}-10.103.12.39:27017
   2024-08-29 11:10:11,197 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=1}] - recycle classloader for thread 
BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=1}
   2024-08-29 11:10:11,197 INFO  [.c.c.DefaultClassLoaderService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=1}] - recycle classloader for thread 
cluster-rtt-ClusterId{value='66cfd9f81461db4b6c7a5a33', 
description='null'}-10.103.12.39:27017
   2024-08-29 11:10:11,201 INFO  [o.a.s.e.s.TaskExecutionService] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=1}] - [10.103.22.10]:5802 [seatunnel] [5.1] taskGroup 
TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1} 
complete with CANCELED
   2024-08-29 11:10:11,201 INFO  [.e.IncrementalSourceEnumerator] 
[BlockingWorker-TaskGroupLocation{jobId=881361616757587971, pipelineId=1, 
taskGroupId=1}] - Closing enumerator...
   2024-08-29 11:10:11,201 INFO  [o.a.s.e.s.TaskExecutionService] 
[hz.main.seaTunnel.task.thread-71] - [10.103.22.10]:5802 [seatunnel] [5.1] Task 
TaskGroupLocation{jobId=881361616757587971, pipelineId=1, taskGroupId=1} 
complete with state CANCELED
   2024-08-29 11:10:13,141 INFO  [r.IncrementalSourceSplitReader] [Source Data 
Fetcher for BlockingWorker-TaskGroupLocation{jobId=881361616757587971, 
pipelineId=1, taskGroupId=30000}] - Close current fetcher 
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher
   2024-08-29 11:10:13,596 INFO  [c.m.k.c.s.h.HeartbeatManager  ] 
[debezium-reader-0] - Generating heartbeat event. {"_data": {"$binary": 
{"base64": 
"gmbP5pIAAAABRjxfaWQAPDRkMDA4ODRjMmYyYTI0MWJkNzAwY2ViODZUVlNPVQAAWhAEI6PeJyh4Q4WSkVMWgbWnSgQ=",
 "subType": "00"}}}
   2024-08-29 11:10:13,597 INFO  [o.a.s.c.s.c.s.r.f.SplitFetcher] [Source Data 
Fetcher for BlockingWorker-TaskGroupLocation{jobId=881361616757587971, 
pipelineId=1, taskGroupId=30000}] - Split fetcher 0 exited.
   2024-08-29 11:10:14,207 INFO  [a.s.e.s.s.s.DefaultSlotService] 
[hz.main.generic-operation.thread-19] - received slot release request, jobID: 
881361616757587971, slot: SlotProfile{worker=[10.103.22.10]:5802, slotID=36, 
ownerJobID=881361616757587971, assigned=true, 
resourceProfile=ResourceProfile{cpu=CPU{core=0}, heapMemory=Memory{bytes=0}}, 
sequence='12893983-3196-4808-a2ed-22de231e4ba6'}
   ```
   
   
   ### Zeta or Flink or Spark Version
   
   Zeta 2.3.7
   
   ### Java or Scala Version
   
   Java 8
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to