yjmgit opened a new issue, #9313:
URL: https://github.com/apache/seatunnel/issues/9313

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   我配置了 table-names = 
["unc_wlan.public.cdc_test_table_3","unc_wlan.public.cdc_test_table"],会报错
   
   `[974647096923652097] 2025-05-13 20:19:46,783 ERROR 
org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job 
(974647096923652097), Pipeline: [(1/1)], task: [pipeline-1 
[Source[0]-Postgres-CDC]-SourceTask (1/1)], taskGroupLocation: 
[TaskGroupLocation{jobId=974647096923652097, pipelineId=1, taskGroupId=2}] end 
with state FAILED and Exception: java.lang.RuntimeException: 
org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[COMMON-02], ErrorDescription:[Debezium JSON convert/parse 
'SeaTunnelRow{tableId=unc_wlan.public.cdc_test_table, kind=+I, fields=[335, 1, 
2]}' operation failed.]
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:302)
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75)
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
        at 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
        at java.util.concurrent.FutureTask.run(FutureTask.java)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[COMMON-02], ErrorDescription:[Debezium JSON convert/parse 
'SeaTunnelRow{tableId=unc_wlan.public.cdc_test_table, kind=+I, fields=[335, 1, 
2]}' operation failed.]
        at 
org.apache.seatunnel.common.exception.CommonError.jsonOperationError(CommonError.java:207)
        at 
org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema.serialize(DebeziumJsonSerializationSchema.java:68)
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer.lambda$valueExtractor$9(DefaultSeaTunnelRowSerializer.java:197)
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer.serializeRow(DefaultSeaTunnelRowSerializer.java:68)
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.write(KafkaSinkWriter.java:98)
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.write(KafkaSinkWriter.java:43)
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:268)
        ... 18 more
   Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[COMMON-02], ErrorDescription:[Common JSON convert/parse 
'SeaTunnelRow{tableId=, kind=+I, fields=[null, 
SeaTunnelRow{tableId=unc_wlan.public.cdc_test_table, kind=+I, fields=[335, 1, 
2]}, c]}' operation failed.]
        at 
org.apache.seatunnel.common.exception.CommonError.jsonOperationError(CommonError.java:207)
        at 
org.apache.seatunnel.format.json.JsonSerializationSchema.serialize(JsonSerializationSchema.java:84)
        at 
org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema.serialize(DebeziumJsonSerializationSchema.java:55)
        ... 23 more
   Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
java.lang.Integer
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$6.convert(RowToJsonConverters.java:117)
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$1.convert(RowToJsonConverters.java:73)
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$18.convert(RowToJsonConverters.java:235)
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$1.convert(RowToJsonConverters.java:73)
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$18.convert(RowToJsonConverters.java:235)
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$1.convert(RowToJsonConverters.java:73)
        at 
org.apache.seatunnel.format.json.JsonSerializationSchema.serialize(JsonSerializationSchema.java:81)
        ... 24 more`
   
   如果配置一个表的话,就没问题。
   
   
   
   ### SeaTunnel Version
   
   dev
   
   2.3.11
   
   ### SeaTunnel Config
   
   ```conf
   env {
     execution.parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 10000
     read_limit.bytes_per_second=7000000
     read_limit.rows_per_second=400
   }
   
   source {
   
     Postgres-CDC {
       plugin_output = "jdbc_result_cdc_test_table"
       username = "postgresadmin"
       password = "rJ1#sDn@"
       database-names = ["unc_wlan","unc_wlan"]
       schema-names = ["pulic","pulic"]
       table-names = 
["unc_wlan.public.cdc_test_table_3","unc_wlan.public.cdc_test_table"]
       base-url = 
"jdbc:postgresql://172.28.205.124:25432/unc_wlan?loggerLevel=OFF"
        startup.mode = "latest"
        slot.name = "bb"
     }
   
   }
   
   
   sink {
   
     kafka {
       plugin_input = "jdbc_result_cdc_test_table"
       topic = "pg-cdc-test"
       bootstrap.servers = 
"172.28.205.121:9093,172.28.205.122:9093,172.28.205.123:9093"
       format = "debezium_json"
       kafka.config = {
         request.timeout.ms = 60000
         buffer.memory = 33554432
         # SSL Configuration
         security.protocol = "SSL"
         ssl.keystore.location = "/ssl/keystore.jks"
         ssl.keystore.password = "rJ1#sDn"
         ssl.key.password = "rJ1#sDn"
         ssl.truststore.location = "/ssl/truststore.jks"
         ssl.truststore.password = "rJ1#sDn"
         ssl.endpoint.identification.algorithm = ""
       }
     }
   }
   ```
   
   ### Running Command
   
   ```shell
   本地启动
   ```
   
   ### Error Exception
   
   ```log
   'SeaTunnelRow{tableId=unc_wlan.public.cdc_test_table, kind=+I, fields=[335, 
1, 2]}' operation failed.]
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:302)
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:70)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:75)
        at 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50)
        at 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:72)
        at 
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
        at 
org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:77)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
        at 
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
        at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
        at java.util.concurrent.FutureTask.run(FutureTask.java)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[COMMON-02], ErrorDescription:[Debezium JSON convert/parse 
'SeaTunnelRow{tableId=unc_wlan.public.cdc_test_table, kind=+I, fields=[335, 1, 
2]}' operation failed.]
        at 
org.apache.seatunnel.common.exception.CommonError.jsonOperationError(CommonError.java:207)
        at 
org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema.serialize(DebeziumJsonSerializationSchema.java:68)
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer.lambda$valueExtractor$9(DefaultSeaTunnelRowSerializer.java:197)
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer.serializeRow(DefaultSeaTunnelRowSerializer.java:68)
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.write(KafkaSinkWriter.java:98)
        at 
org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.write(KafkaSinkWriter.java:43)
        at 
org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:268)
        ... 18 more
   Caused by: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: 
ErrorCode:[COMMON-02], ErrorDescription:[Common JSON convert/parse 
'SeaTunnelRow{tableId=, kind=+I, fields=[null, 
SeaTunnelRow{tableId=unc_wlan.public.cdc_test_table, kind=+I, fields=[335, 1, 
2]}, c]}' operation failed.]
        at 
org.apache.seatunnel.common.exception.CommonError.jsonOperationError(CommonError.java:207)
        at 
org.apache.seatunnel.format.json.JsonSerializationSchema.serialize(JsonSerializationSchema.java:84)
        at 
org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema.serialize(DebeziumJsonSerializationSchema.java:55)
        ... 23 more
   Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
java.lang.Integer
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$6.convert(RowToJsonConverters.java:117)
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$1.convert(RowToJsonConverters.java:73)
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$18.convert(RowToJsonConverters.java:235)
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$1.convert(RowToJsonConverters.java:73)
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$18.convert(RowToJsonConverters.java:235)
        at 
org.apache.seatunnel.format.json.RowToJsonConverters$1.convert(RowToJsonConverters.java:73)
        at 
org.apache.seatunnel.format.json.JsonSerializationSchema.serialize(JsonSerializationSchema.java:81)
        ... 24 more`
   ```
   
   ### Zeta or Flink or Spark Version
   
   zeta
   
   ### Java or Scala Version
   
   1.8
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to