zyhui98 commented on issue #10084:
URL: https://github.com/apache/seatunnel/issues/10084#issuecomment-3551272068

   改造成string类型是可以了,但是如果是同步两张以上的表同步到kafka会发生序列化错误
   
   `[1043430649521766401] 2025-11-19 15:40:58,319 ERROR 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job 
(1043430649521766401), Pipeline: [(1/1)], task: [pipeline-1 
[Source[0]-MongoDB-CDC]-SourceTask (1/1)], taskGroupLocation: 
[TaskGroupLocation{jobId=1043430649521766401, pipelineId=1, taskGroupId=2}] end 
with state FAILED and Exception: java.lang.RuntimeException: 
org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[COMMON-02], ErrorDescription:[Common JSON convert/parse 
'SeaTunnelRow{tableId=tw_game.user_base, kind=+I, 
fields=[68d533feffce9063b86a6971, A00001_delete_at_2025-09-26_11:00:05, 
1758802942535, 1758803136220, 1758802942535, 2, 1203294123, null, null, null]}' 
operation failed.]
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:303)
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:82)
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:56)
        at 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:679)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1008)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
        at java.util.concurrent.FutureTask.run(FutureTask.java)
        at --- Async.Stack.Trace --- (captured by IntelliJ IDEA debugger)
        at java.util.concurrent.FutureTask.<init>(FutureTask.java:151)
        at 
java.util.concurrent.AbstractExecutorService.newTaskFor(AbstractExecutorService.java:87)
        at 
java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:111)
        at 
org.apache.seatunnel.api.tracing.MDCExecutorService.submit(MDCExecutorService.java:78)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService.submitBlockingTask(TaskExecutionService.java:255)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService.deployLocalTask(TaskExecutionService.java:405)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService.deployTask(TaskExecutionService.java:340)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.lambda$deployOnLocal$0(PhysicalVertex.java:269)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.deployInternal(PhysicalVertex.java:324)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.deployOnLocal(PhysicalVertex.java:263)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.deploy(PhysicalVertex.java:311)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.stateProcess(PhysicalVertex.java:558)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.updateTaskState(PhysicalVertex.java:390)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex.makeTaskGroupDeploy(PhysicalVertex.java:304)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.lambda$stateProcess$21(SubPlan.java:685)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:681)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:667)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.updatePipelineState(SubPlan.java:383)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.stateProcess(SubPlan.java:636)
        at 
org.apache.seatunnel.engine.server.dag.physical.SubPlan.startSubPlanStateProcess(SubPlan.java:619)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan.lambda$stateProcess$3(PhysicalPlan.java:337)
        at java.util.ArrayList.forEach(ArrayList.java:1257)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan.stateProcess(PhysicalPlan.java:333)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan.updateJobState(PhysicalPlan.java:282)
        at 
org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan.startJob(PhysicalPlan.java:311)
        at 
org.apache.seatunnel.engine.server.master.JobMaster.run(JobMaster.java:556)
        at 
org.apache.seatunnel.engine.server.CoordinatorService.lambda$pendingJobSchedule$1(CoordinatorService.java:329)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
        at java.util.concurrent.FutureTask.run(FutureTask.java)
        at --- Async.Stack.Trace --- (captured by IntelliJ IDEA debugger)
        at java.util.concurrent.FutureTask.<init>(FutureTask.java:151)
        at 
java.util.concurrent.AbstractExecutorService.newTaskFor(AbstractExecutorService.java:87)
        at 
java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:111)
        at 
org.apache.seatunnel.api.tracing.MDCExecutorService.submit(MDCExecutorService.java:78)
        at 
org.apache.seatunnel.engine.server.CoordinatorService.pendingJobSchedule(CoordinatorService.java:311)
        at 
org.apache.seatunnel.engine.server.CoordinatorService.lambda$startPendingJobScheduleThread$0(CoordinatorService.java:241)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
        at java.util.concurrent.FutureTask.run(FutureTask.java)
        at --- Async.Stack.Trace --- (captured by IntelliJ IDEA debugger)
        at java.util.concurrent.FutureTask.<init>(FutureTask.java:151)
        at 
java.util.concurrent.AbstractExecutorService.newTaskFor(AbstractExecutorService.java:87)
        at 
java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:111)
        at 
org.apache.seatunnel.engine.server.CoordinatorService.startPendingJobScheduleThread(CoordinatorService.java:255)
        at 
org.apache.seatunnel.engine.server.CoordinatorService.<init>(CoordinatorService.java:232)
        at 
org.apache.seatunnel.engine.server.SeaTunnelServer.startMaster(SeaTunnelServer.java:190)
        at 
org.apache.seatunnel.engine.server.SeaTunnelServer.init(SeaTunnelServer.java:155)
        at 
com.hazelcast.spi.impl.servicemanager.impl.ServiceManagerImpl.initService(ServiceManagerImpl.java:235)
        at 
com.hazelcast.spi.impl.servicemanager.impl.ServiceManagerImpl.initServices(ServiceManagerImpl.java:211)
        at 
com.hazelcast.spi.impl.servicemanager.impl.ServiceManagerImpl.start(ServiceManagerImpl.java:103)
        at com.hazelcast.spi.impl.NodeEngineImpl.start(NodeEngineImpl.java:250)
        at com.hazelcast.instance.impl.Node.start(Node.java:458)
        at 
com.hazelcast.instance.impl.HazelcastInstanceImpl.<init>(HazelcastInstanceImpl.java:124)
        at 
com.hazelcast.instance.impl.HazelcastInstanceFactory.constructHazelcastInstance(HazelcastInstanceFactory.java:217)
        at 
com.hazelcast.instance.impl.HazelcastInstanceFactory.newHazelcastInstance(HazelcastInstanceFactory.java:196)
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.createServerInLocal(ClientExecuteCommand.java:284)
        at 
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:98)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at 
org.apache.seatunnel.example.engine.SeaTunnelEngineLocalExample.main(SeaTunnelEngineLocalExample.java:51)
   Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[COMMON-02], ErrorDescription:[Common JSON convert/parse 
'SeaTunnelRow{tableId=tw_game.user_base, kind=+I, 
fields=[68d533feffce9063b86a6971, A00001_delete_at_2025-09-26_11:00:05, 
1758802942535, 1758803136220, 1758802942535, 2, 1203294123, null, null, null]}' 
operation failed.]
        at 
org.apache.seatunnel.common.exception.CommonError.jsonOperationError(CommonError.java:207)
        at 
org.apache.seatunnel.format.json.JsonSerializationSchema.serialize(JsonSerializationSchema.java:84)
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer.lambda$valueExtractor$13(DefaultSeaTunnelRowSerializer.java:256)
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer.serializeRow(DefaultSeaTunnelRowSerializer.java:80)
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.write(KafkaSinkWriter.java:126)
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.write(KafkaSinkWriter.java:71)
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:269)
        ... 88 more
   Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to 
java.lang.String
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$12.convert(RowToJsonConverters.java:159)
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$1.convert(RowToJsonConverters.java:73)
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$18.convert(RowToJsonConverters.java:235)
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$1.convert(RowToJsonConverters.java:73)
        at 
org.apache.seatunnel.format.json.JsonSerializationSchema.serialize(JsonSerializationSchema.java:81)
        ... 93 more`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to