cobolbaby opened a new issue, #7905: URL: https://github.com/apache/seatunnel/issues/7905
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Struct.get(String)" because "struct" is null ### SeaTunnel Version 2.3.8 ### SeaTunnel Config ```conf env { # You can set engine configuration here execution.parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 60000 read_limit.bytes_per_second=50000000 read_limit.rows_per_second=10000 } source { Postgres-CDC { # 源端数据库 JDBC Url base-url = "jdbc:postgresql://***/bdc" username = "***" password = "***" database-names = ["bdc"] schema-names = ["dw"] table-names = ["bdc.dw.fact_cpu_sn", "bdc.dw.dim_cpu_dn"] result_table_name = "SQT_PG_BDC_CDC_dw_cpu" } } transform { } sink { jdbc { # https://seatunnel.apache.org/docs/2.3.8/connector-v2/sink/Jdbc source_table_name = "SQT_PG_BDC_CDC_dw_cpu" url = "jdbc:postgresql://***/bdc_test" driver = "org.postgresql.Driver" user = "***" password = "***" # You need to configure both database and table database = "bdc_test" table = "dw_sqt.${table_name}" primary_keys = ["${primary_key}"] schema_save_mode = "IGNORE" generate_sink_sql = "true" } } ``` ### Running Command ```shell ./bin/start-seatunnel-flink-15-connector-v2.sh --config ./config/pgcdc2pg_bdc_dw_cpu.conf ``` ### Error Exception ```log ==> flink--standalonesession-0-0489b20f21c4.log <== 2024-10-24 18:24:50,728 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Postgres-CDC-Source -> MultiTableSink-Sink: Writer (1/1) (8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5) switched from RUNNING to FAILED on 172.17.232.66:40409-cf2488 @ 0489b20f21c4 (dataPort=41559). java.lang.NullPointerException: null at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88) ~[?:?] at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:223) ~[?:?] at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:188) ~[?:?] at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:198) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:150) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:101) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:61) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[?:?] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114) ~[?:?] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[?:?] at org.apache.seatunnel.translation.flink.source.FlinkSourceReader.pollNext(FlinkSourceReader.java:80) ~[?:?] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.18.1.jar:1.18.1] at java.lang.Thread.run(Unknown Source) ~[?:?] 2024-10-24 18:24:50,729 INFO org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] - Clearing resource requirements of job 515026a838b438877c64bef449244e53 2024-10-24 18:24:50,729 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Removing registered reader after failure for subtask 0 (#5) of source Source: Postgres-CDC-Source. 2024-10-24 18:24:50,729 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 1 tasks will be restarted to recover the failed task 8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5. 2024-10-24 18:24:50,729 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job SeaTunnel (515026a838b438877c64bef449244e53) switched from state RUNNING to RESTARTING. ==> flink--taskexecutor-0-0489b20f21c4.log <== 2024-10-24 18:24:50,726 INFO io.debezium.jdbc.JdbcConnection [] - Connection gracefully closed 2024-10-24 18:24:50,726 INFO org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited. 2024-10-24 18:24:50,726 INFO org.apache.seatunnel.api.event.LoggingEventHandler [] - log event: ReaderCloseEvent(createdTime=1729794290726, jobId=515026a838b438877c64bef449244e53, eventType=LIFECYCLE_READER_CLOSE) 2024-10-24 18:24:50,726 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Postgres-CDC-Source -> MultiTableSink-Sink: Writer (1/1)#5 (8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Struct.get(String)" because "struct" is null at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializationConverters.convert(SeaTunnelRowDebeziumDeserializationConverters.java:88) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8] at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.extractBeforeRow(SeaTunnelRowDebeziumDeserializeSchema.java:223) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8] at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord(SeaTunnelRowDebeziumDeserializeSchema.java:188) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8] at org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema.deserialize(SeaTunnelRowDebeziumDeserializeSchema.java:111) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:198) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:150) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:101) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:61) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110) ~[blob_p-9113e096f42818c442db749bb2f798c1c2e41d04-20bab250076a2147589ecdb43456d7d8:2.3.8] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8] at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114) ~[blob_p-9113e096f42818c442db749bb2f798c1c2e41d04-20bab250076a2147589ecdb43456d7d8:2.3.8] at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) ~[blob_p-3841ea009d7820ab0fb82d5b17c4071d29d41517-aae02a148ea3fc01771561df539d814b:2.3.8] at org.apache.seatunnel.translation.flink.source.FlinkSourceReader.pollNext(FlinkSourceReader.java:80) ~[blob_p-9113e096f42818c442db749bb2f798c1c2e41d04-20bab250076a2147589ecdb43456d7d8:2.3.8] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) [flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) [flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.18.1.jar:1.18.1] at java.lang.Thread.run(Unknown Source) [?:?] 2024-10-24 18:24:50,726 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Postgres-CDC-Source -> MultiTableSink-Sink: Writer (1/1)#5 (8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5). 2024-10-24 18:24:50,727 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: Postgres-CDC-Source -> MultiTableSink-Sink: Writer (1/1)#5 8c6dbb40f39dfb76d50b9c9bdc1362e4_cbc357ccb763df2852fee8c4fc7d55f2_0_5. ``` ### Zeta or Flink or Spark Version Flink: 1.18.1 ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
