This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4d1a44afcd [flink] Fix Debezium Avro deserialization for multi topic
source (#7871)
4d1a44afcd is described below
commit 4d1a44afcda60a37d6ca72287c206e85d098ad20
Author: Arnav Balyan <[email protected]>
AuthorDate: Sat May 23 20:19:10 2026 +0530
[flink] Fix Debezium Avro deserialization for multi topic source (#7871)
- KafkaDebeziumAvroDeserializationSchema returns 1 topic via
`findOneTopic` at init time, this gets reused for every incoming record.
- With multi topic configs (topic=a;b), any message coming from another
topic leads to a crash with `SerializationException: The given schema
does not match`. The same cached topic is also passed into
`CdcSourceRecord`, misrouting records to the wrong table.
- Read topic from the actual message for deserialize instead of using
the cached value at init time.
- Closes user reported bug #7859.
---
.../flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
index 6c53dd05d5..dc36151e7b 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumAvroDeserializationSchema.java
@@ -41,14 +41,12 @@ public class KafkaDebeziumAvroDeserializationSchema
private static final long serialVersionUID = 1L;
- private final String topic;
private final String schemaRegistryUrl;
/** The deserializer to deserialize Debezium Avro data. */
private ConfluentAvroDeserializationSchema avroDeserializer;
public KafkaDebeziumAvroDeserializationSchema(Configuration
cdcSourceConfig) {
- this.topic = KafkaActionUtils.findOneTopic(cdcSourceConfig);
this.schemaRegistryUrl = cdcSourceConfig.get(SCHEMA_REGISTRY_URL);
}
@@ -69,6 +67,7 @@ public class KafkaDebeziumAvroDeserializationSchema
initAvroDeserializer();
}
+ String topic = message.topic();
GenericContainerWithVersion keyContainerWithVersion =
this.avroDeserializer.deserialize(topic, true, message.key());
GenericContainerWithVersion valueContainerWithVersion =