yl-yue opened a new issue, #5850:
URL: https://github.com/apache/paimon/issues/5850

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Paimon version
   
   ```
   1.1.1+
   1.2+
   paimon-s3-1.1.1.jar
   paimon-flink-action-1.1.1.jar
   paimon-flink-1.20-1.1.1.jar
   ```
   
   ### Compute Engine
   
   flink 1.20.1
   flink-sql-connector-kafka-3.4.0-1.20.jar
   
   ### Minimal reproduce step
   
   ```json
   {
       "id": 0,
       "database": "bd_input_test",
       "table": "column_type",
       "pkNames": [
           "id"
       ],
       "isDdl": false,
       "type": "INSERT",
       "es": 1751959178766,
       "ts": 1751959179914,
       "sql": "",
       "sqlType": {
           "id": 4,
           "test_tinyint": -6,
           "test_tinyint_unsigned": -6,
           "test_tinyint_1": -6,
           "test_tinyint_1_unsigned": -6,
           "test_bigint": -5,
           "test_bigint_unsigned": -5,
           "create_time": -5
       },
       "mysqlType": {
           "create_time": "bigint(0) unsigned",
           "id": "int(11)",
           "test_tinyint": "tinyint(4)",
           "test_tinyint_unsigned": "tinyint(3) unsigned",
           "test_tinyint_1": "tinyint(1)",
           "test_tinyint_1_unsigned": "tinyint(1) unsigned",
           "test_bigint": "bigint(20)",
           "test_bigint_unsigned": "bigint(20) unsigned"
       },
       "old": null,
       "data": [
           {
               "id": "5",
               "test_tinyint": "5",
               "test_tinyint_unsigned": "5",
               "test_tinyint_1": "0",
               "test_tinyint_1_unsigned": "1",
               "test_bigint": "5",
               "test_bigint_unsigned": "5",
               "create_time": "0"
           }
       ]
   }
   ```
   
   ```bash
   kafka_sync_database
    --mode combined
    --warehouse s3://bd-test-aaa/paimon_flink_BBB
    --database input_bd_input_test
    --including_dbs bd_input_test
    --including_tables "column_type"
    --ignore_incompatible true
    --type_mapping char-to-string,to-nullable
    --table_conf bucket=-2
    --table_conf sink.parallelism=1
    --table_conf changelog-producer=input
    --catalog_conf metastore=filesystem
    --catalog_conf uri=s3://bd-test-aaa/paimon_flink_BBB
    --catalog_conf s3.endpoint=s3.ap-southeast-5.amazonaws.com
    --catalog_conf s3.access-key=xxxxxxxxxxxxx
    --catalog_conf s3.secret-key=xxxxxxxxxxxxxxxx 
    --kafka_conf value.format=canal-json 
    --kafka_conf scan.startup.mode=earliest-offset 
    --kafka_conf properties.bootstrap.servers=bd-kafka.xxxxxx:9092 
    --kafka_conf topic=input_bd_input_test.column_type 
   ```
   
   ### What doesn't meet your expectations?
   
   ```java
   2025-07-08 07:21:31,080 INFO  
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig 
[] - ConsumerConfig values: 
        auto.offset.reset = earliest
        key.deserializer = class 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
        value.deserializer = class 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
   ...
   
   2025-07-08 07:21:31,088 ERROR 
org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumJsonDeserializationSchema 
[] - Invalid Json:
   
{"id":0,"database":"bd_input_test","table":"column_type","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1751959178766,"ts":1751959179914,"sql":"","sqlType":{"id":4,"test_tinyint":-6,"test_tinyint_unsigned":-6,"test_tinyint_1":-6,"test_tinyint_1_unsigned":-6,"test_bigint":-5,"test_bigint_unsigned":-5,"create_time":-5},"mysqlType":{"create_time":"bigint(0)
 
unsigned","id":"int(11)","test_tinyint":"tinyint(4)","test_tinyint_unsigned":"tinyint(3)
 unsigned","test_tinyint_1":"tinyint(1)","test_tinyint_1_unsigned":"tinyint(1) 
unsigned","test_bigint":"bigint(20)","test_bigint_unsigned":"bigint(20) 
unsigned"},"old":null,"data":[{"id":"5","test_tinyint":"5","test_tinyint_unsigned":"5","test_tinyint_1":"0","test_tinyint_1_unsigned":"1","test_bigint":"5","test_bigint_unsigned":"5","create_time":"0"}]}
   ...
   
   2025-07-08 07:21:31,089 WARN  org.apache.flink.runtime.taskmanager.Task      
              [] - Source: Kafka Source -> Parse -> Side Output (1/1)#5 
(9a81d28b7f6dbf083984a759321c8094_cbc357ccb763df2852fee8c4fc7d55f2_0_5) 
switched from RUNNING to FAILED with failure cause:
   java.io.IOException: Failed to deserialize consumer record due to
        at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
 ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20]
        at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
 ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20]
        at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
 ~[flink-connector-files-1.20.1.jar:1.20.1]
        at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) 
~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
 ~[flink-dist-1.20.1.jar:1.20.1]
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) 
[flink-dist-1.20.1.jar:1.20.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) 
[flink-dist-1.20.1.jar:1.20.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
[flink-dist-1.20.1.jar:1.20.1]
        at java.lang.Thread.run(Unknown Source) [?:?]
   Caused by: java.io.IOException: Failed to deserialize consumer record 
ConsumerRecord(topic = input_bd_input_test.column_type, partition = 0, 
leaderEpoch = 0, offset = 1, CreateTime = 1751959179914, serialized key size = 
0, serialized value size = 804, headers = RecordHeaders(headers = [], 
isReadOnly = false), key = [B@7d4932cc, value = [B@3b78bfde).
        at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:59)
 ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20]
        at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20]
        ... 14 more
   Caused by: 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
 No content to map due to end-of-input
    at [Source: (byte[])""; line: 1, column: 0]
        at 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
 ~[paimon-flink-1.20-1.2.0.jar:1.2.0]
        at 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4821)
 ~[paimon-flink-1.20-1.2.0.jar:1.2.0]
        at 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4723)
 ~[paimon-flink-1.20-1.2.0.jar:1.2.0]
        at 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3738)
 ~[paimon-flink-1.20-1.2.0.jar:1.2.0]
        at 
org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumJsonDeserializationSchema.deserialize(KafkaDebeziumJsonDeserializationSchema.java:70)
 ~[paimon-flink-1.20-1.2.0.jar:1.2.0]
        at 
org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumJsonDeserializationSchema.deserialize(KafkaDebeziumJsonDeserializationSchema.java:39)
 ~[paimon-flink-1.20-1.2.0.jar:1.2.0]
        at 
org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:81)
 ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20]
        at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
 ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20]
        at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
 ~[flink-sql-connector-kafka-3.4.0-1.20.jar:3.4.0-1.20]
        ... 14 more
   ```
   
   ### Anything else?
   
   Paimon 1.0.1 is functioning normally. However, upgrading to Paimon 1.1.1 or 
1.2.0 results in a "deserialization" error.
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to