vikasap opened a new issue, #7859: URL: https://github.com/apache/paimon/issues/7859
### Search before asking - [x] I searched in the [issues](https://github.com/apache/paimon/issues) and found nothing similar. ### Paimon version 1.4.1 ### Compute Engine Flink ### Minimal reproduce step Minimal repro: 1. Create two Kafka topics with Debezium Avro records registered under different Confluent Schema Registry subjects, for example: ```text db.public.orders db.public.users ``` 2. Run one Paimon `kafka_sync_database` job with both topics and Debezium Avro: ```bash kafka_sync_database \ --warehouse /tmp/paimon \ --database test_db \ --kafka_conf properties.bootstrap.servers=<broker> \ --kafka_conf properties.group.id=paimon-multi-topic-avro-repro \ --kafka_conf value.format=debezium-avro \ --kafka_conf schema.registry.url=<schema-registry-url> \ --kafka_conf 'topic=db.public.orders;db.public.users' ``` Equivalent repro with `topic-pattern`: ```bash --kafka_conf 'topic-pattern=db\.public\.(orders|users)' ``` 3. Produce/read records from both topics. Expected: each record is deserialized using its own topic subject, e.g. `db.public.orders-key` or `db.public.users-key`. Actual: Paimon resolves one topic once and uses that subject for records from both topics, causing Schema Registry lookup/deserialization failures for records from the other topic. ### What doesn't meet your expectations? We see the following stacktrace when using the kafka_sync_database `java.io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:164) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:493) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568) at java.lang.Thread.run(Unknown Source) Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id <schema-id> at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:167) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215) at org.apache.paimon.flink.action.cdc.serialization.ConfluentAvroDeserializationSchema.deserialize(ConfluentAvroDeserializationSchema.java:39) at org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumAvroDeserializationSchema.deserialize(KafkaDebeziumAvroDeserializationSchema.java:73) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ... 14 common frames omitted Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: The given schema does not match any schema under the subject <topic-name>-key; error code: 40403 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252) at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319) at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:158) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:271) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184) at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153) ... 18 common frames omitted ` ### Anything else? Brief analysis. `kafka_sync_database` with `value.format=debezium-avro` fails for multi-topic Kafka sources because `KafkaDebeziumAvroDeserializationSchema` resolves a single topic from the source config during initialization and reuses it for every record. When the source uses `topic-pattern` or a multi-topic `topic` list, records from other topics are deserialized using the wrong Confluent Schema Registry subject, causing schema lookup/deserialization failures before Paimon can route the event by Debezium metadata. The deserializer should use `ConsumerRecord.topic()` for each record when deserializing the key/value and when creating the `CdcSourceRecord`. ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
