vikasap opened a new issue, #7859:
URL: https://github.com/apache/paimon/issues/7859

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Paimon version
   
   1.4.1
   
   ### Compute Engine
   
   Flink
   
   ### Minimal reproduce step
   
   Minimal repro:
   
   1. Create two Kafka topics with Debezium Avro records registered under 
different Confluent Schema Registry subjects, for example:
   
   ```text
   db.public.orders
   db.public.users
   ```
   
   2. Run one Paimon `kafka_sync_database` job with both topics and Debezium 
Avro:
   
   ```bash
   kafka_sync_database \
     --warehouse /tmp/paimon \
     --database test_db \
     --kafka_conf properties.bootstrap.servers=<broker> \
     --kafka_conf properties.group.id=paimon-multi-topic-avro-repro \
     --kafka_conf value.format=debezium-avro \
     --kafka_conf schema.registry.url=<schema-registry-url> \
     --kafka_conf 'topic=db.public.orders;db.public.users'
   ```
   
   Equivalent repro with `topic-pattern`:
   
   ```bash
   --kafka_conf 'topic-pattern=db\.public\.(orders|users)'
   ```
   
   3. Produce/read records from both topics.
   
   Expected: each record is deserialized using its own topic subject, e.g. 
`db.public.orders-key` or `db.public.users-key`.
   
   Actual: Paimon resolves one topic once and uses that subject for records 
from both topics, causing Schema Registry lookup/deserialization failures for 
records from the other topic.
   
   ### What doesn't meet your expectations?
   
   We see the following stacktrace when using the kafka_sync_database
   
   `java.io.IOException: Failed to deserialize consumer record due to
       at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
       at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
       at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:164)
       at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:493)
       at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
       at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
       at java.lang.Thread.run(Unknown Source)
   
   Caused by: org.apache.kafka.common.errors.SerializationException: Error 
retrieving Avro schema for id <schema-id>
       at 
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:167)
       at 
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:215)
       at 
org.apache.paimon.flink.action.cdc.serialization.ConfluentAvroDeserializationSchema.deserialize(ConfluentAvroDeserializationSchema.java:39)
       at 
org.apache.paimon.flink.action.cdc.kafka.KafkaDebeziumAvroDeserializationSchema.deserialize(KafkaDebeziumAvroDeserializationSchema.java:73)
       at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
       ... 14 common frames omitted
   
   Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
   The given schema does not match any schema under the subject 
<topic-name>-key; error code: 40403
       at 
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:226)
       at 
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
       at 
io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:319)
       at 
io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:307)
       at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:158)
       at 
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:271)
       at 
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:184)
       at 
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:153)
       ... 18 common frames omitted
   `
   
   ### Anything else?
   
   Brief analysis.
   
   `kafka_sync_database` with `value.format=debezium-avro` fails for 
multi-topic Kafka sources because `KafkaDebeziumAvroDeserializationSchema` 
resolves a single topic from the source config during initialization and reuses 
it for every record. When the source uses `topic-pattern` or a multi-topic 
`topic` list, records from other topics are deserialized using the wrong 
Confluent Schema Registry subject, causing schema lookup/deserialization 
failures before Paimon can route the event by Debezium metadata. The 
deserializer should use `ConsumerRecord.topic()` for each record when 
deserializing the key/value and when creating the `CdcSourceRecord`.
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to