This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d1a44afcd [flink] Fix Debezium Avro deserialization for multi topic 
source (#7871)
4d1a44afcd is described below

commit 4d1a44afcda60a37d6ca72287c206e85d098ad20
Author: Arnav Balyan <[email protected]>
AuthorDate: Sat May 23 20:19:10 2026 +0530

    [flink] Fix Debezium Avro deserialization for multi topic source (#7871)
    
    - KafkaDebeziumAvroDeserializationSchema returns 1 topic via
    `findOneTopic` at init time, this gets reused for every incoming record.
    - With multi topic configs (topic=a;b), any message coming from another
    topic leads to a crash with `SerializationException: The given schema
    does not match`. The same cached topic is also passed into
    `CdcSourceRecord`, misrouting records to the wrong table.
    - Read topic from the actual message for deserialize instead of using
    the cached value at init time.
       - Closes user reported bug #7859.
---
 .../flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
index 6c53dd05d5..dc36151e7b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
@@ -41,14 +41,12 @@ public class KafkaDebeziumAvroDeserializationSchema
 
     private static final long serialVersionUID = 1L;
 
-    private final String topic;
     private final String schemaRegistryUrl;
 
     /** The deserializer to deserialize Debezium Avro data. */
     private ConfluentAvroDeserializationSchema avroDeserializer;
 
     public KafkaDebeziumAvroDeserializationSchema(Configuration 
cdcSourceConfig) {
-        this.topic = KafkaActionUtils.findOneTopic(cdcSourceConfig);
         this.schemaRegistryUrl = cdcSourceConfig.get(SCHEMA_REGISTRY_URL);
     }
 
@@ -69,6 +67,7 @@ public class KafkaDebeziumAvroDeserializationSchema
             initAvroDeserializer();
         }
 
+        String topic = message.topic();
         GenericContainerWithVersion keyContainerWithVersion =
                 this.avroDeserializer.deserialize(topic, true, message.key());
         GenericContainerWithVersion valueContainerWithVersion =

Reply via email to