yl-yue opened a new issue, #5850: URL: https://github.com/apache/paimon/issues/5850
### Search before asking - [x] I searched in the [issues](https://github.com/apache/paimon/issues) and found nothing similar. ### Paimon version ``` 1.1.1+ 1.2+ paimon-s3-1.1.1.jar paimon-flink-action-1.1.1.jar paimon-flink-1.20-1.1.1.jar ``` ### Compute Engine flink 1.20.1 flink-sql-connector-kafka-3.4.0-1.20.jar ### Minimal reproduce step ```json { "id": 0, "database": "bd_input_test", "table": "column_type", "pkNames": [ "id" ], "isDdl": false, "type": "INSERT", "es": 1751959178766, "ts": 1751959179914, "sql": "", "sqlType": { "id": 4, "test_tinyint": -6, "test_tinyint_unsigned": -6, "test_tinyint_1": -6, "test_tinyint_1_unsigned": -6, "test_bigint": -5, "test_bigint_unsigned": -5, "create_time": -5 }, "mysqlType": { "create_time": "bigint(0) unsigned", "id": "int(11)", "test_tinyint": "tinyint(4)", "test_tinyint_unsigned": "tinyint(3) unsigned", "test_tinyint_1": "tinyint(1)", "test_tinyint_1_unsigned": "tinyint(1) unsigned", "test_bigint": "bigint(20)", "test_bigint_unsigned": "bigint(20) unsigned" }, "old": null, "data": [ { "id": "5", "test_tinyint": "5", "test_tinyint_unsigned": "5", "test_tinyint_1": "0", "test_tinyint_1_unsigned": "1", "test_bigint": "5", "test_bigint_unsigned": "5", "create_time": "0" } ] } ``` ```bash kafka_sync_database --mode combined --warehouse s3://bd-test-aaa/paimon_flink_BBB --database input_bd_input_test --including_dbs bd_input_test --including_tables "column_type" --ignore_incompatible true --type_mapping char-to-string,to-nullable --table_conf bucket=-2 --table_conf sink.parallelism=1 --table_conf changelog-producer=input --catalog_conf metastore=filesystem --catalog_conf uri=s3://bd-test-aaa/paimon_flink_BBB --catalog_conf s3.endpoint=s3.ap-southeast-5.amazonaws.com --catalog_conf s3.access-key=xxxxxxxxxxxxx --catalog_conf s3.secret-key=xxxxxxxxxxxxxxxx --kafka_conf value.format=canal-json --kafka_conf scan.startup.mode=earliest-offset --kafka_conf properties.bootstrap.servers=bd-kafka.xxxxxx:9092 --kafka_conf topic=input_bd_input_test.column_type ``` ### What doesn't meet your expectations? ```java 2025-07-08 07:21:31,080 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig [] - ConsumerConfig values: auto.offset.reset = earliest key.deserializer = class org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer value.deserializer = class org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer ... 2025-07-08 07:21:31,088 ERROR org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumJsonDeserializationSchema [] - Invalid Json: {"id":0,"database":"bd_input_test","table":"column_type","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1751959178766,"ts":1751959179914,"sql":"","sqlType":{"id":4,"test_tinyint":-6,"test_tinyint_unsigned":-6,"test_tinyint_1":-6,"test_tinyint_1_unsigned":-6,"test_bigint":-5,"test_bigint_unsigned":-5,"create_time":-5},"mysqlType":{"create_time":"bigint(0) unsigned","id":"int(11)","test_tinyint":"tinyint(4)","test_tinyint_unsigned":"tinyint(3) unsigned","test_tinyint_1":"tinyint(1)","test_tinyint_1_unsigned":"tinyint(1) unsigned","test_bigint":"bigint(20)","test_bigint_unsigned":"bigint(20) unsigned"},"old":null,"data":[{"id":"5","test_tinyint":"5","test_tinyint_unsigned":"5","test_tinyint_1":"0","test_tinyint_1_unsigned":"1","test_bigint":"5","test_bigint_unsigned":"5","create_time":"0"}]} ... 2025-07-08 07:21:31,089 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Kafka Source -> Parse -> Side Output (1/1)#5 (9a81d28b7f6dbf083984a759321c8094_cbc357ccb763df2852fee8c4fc7d55f2_0_5) switched from RUNNING to FAILED with failure cause: java.io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203) ~[flink-connector-files-1.20.1.jar:1.20.1] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443) ~[flink-dist-1.20.1.jar:1.20.1] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.20.1.jar:1.20.1] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.20.1.jar:1.20.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) ~[flink-dist-1.20.1.jar:1.20.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.20.1.jar:1.20.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) ~[flink-dist-1.20.1.jar:1.20.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) ~[flink-dist-1.20.1.jar:1.20.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.1.jar:1.20.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) [flink-dist-1.20.1.jar:1.20.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-dist-1.20.1.jar:1.20.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist-1.20.1.jar:1.20.1] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = input_bd_input_test.column_type, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1751959179914, serialized key size = 0, serialized value size = 804, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@7d4932cc, value = [B@3b78bfde). at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:59) ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20] ... 14 more Caused by: org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input at [Source: (byte[])""; line: 1, column: 0] at org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59) ~[paimon-flink-1.20-1.2.0.jar:1.2.0] at org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4821) ~[paimon-flink-1.20-1.2.0.jar:1.2.0] at org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4723) ~[paimon-flink-1.20-1.2.0.jar:1.2.0] at org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3738) ~[paimon-flink-1.20-1.2.0.jar:1.2.0] at org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumJsonDeserializationSchema.deserialize(KafkaDebeziumJsonDeserializationSchema.java:70) ~[paimon-flink-1.20-1.2.0.jar:1.2.0] at org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumJsonDeserializationSchema.deserialize(KafkaDebeziumJsonDeserializationSchema.java:39) ~[paimon-flink-1.20-1.2.0.jar:1.2.0] at org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:81) ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20] at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20] at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20] ... 14 more ``` ### Anything else? Paimon 1.0.1 is functioning normally. However, upgrading to Paimon 1.1.1 or 1.2.0 results in a "deserialization" error. ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
