[ 
https://issues.apache.org/jira/browse/BEAM-10759?focusedWorklogId=472947&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-472947
 ]

ASF GitHub Bot logged work on BEAM-10759:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Aug/20 15:31
            Start Date: 20/Aug/20 15:31
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on a change in pull request #12630:
URL: https://github.com/apache/beam/pull/12630#discussion_r474070383



##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java
##########
@@ -91,7 +91,10 @@
             .put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, 
schemaRegistryUrl)
             .build();
     Deserializer<T> deserializer =
-        (Deserializer<T>) new KafkaAvroDeserializer(getSchemaRegistryClient());
+        (Deserializer<T>)
+            new ConfluentSchemaRegistryDeserializer(
+                getSchemaRegistryClient(),
+                new Schema.Parser().parse(getSchemaMetadata().getSchema()));

Review comment:
       This same line is used in the Coder part for a similar purpose, can we 
extract maybe this as a private method and reuse?
   ```
   private Schema getAvroSchema() { return new 
Schema.Parser().parse(getSchemaMetadata().getSchema()); }
   ``
   

##########
File path: 
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProviderTest.java
##########
@@ -45,7 +45,8 @@ public void testGetCoder() {
     assertEquals(AVRO_SCHEMA, coderV0.getSchema());
 
     try {
-      Integer version = mockRegistryClient.register(subject, AVRO_SCHEMA_V1);
+      mockRegistryClient.register(subject, AVRO_SCHEMA_V1);

Review comment:
       How is this different? Or better how is this testing that the good 
schema is chosen? Maybe we can add a specific test for that?

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java
##########
@@ -116,3 +119,17 @@ private SchemaRegistryClient getSchemaRegistryClient() {
     return this.schemaRegistryClientProviderFn.apply(null);
   }
 }
+
+class ConfluentSchemaRegistryDeserializer extends KafkaAvroDeserializer {
+  Schema readerSchema;
+
+  ConfluentSchemaRegistryDeserializer(SchemaRegistryClient client, Schema 
readerSchema) {
+    this.schemaRegistry = client;

Review comment:
       super(client);




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 472947)
    Time Spent: 0.5h  (was: 20m)

> KafkaIO with Avro deserializer fails with evolved schema
> --------------------------------------------------------
>
>                 Key: BEAM-10759
>                 URL: https://issues.apache.org/jira/browse/BEAM-10759
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.23.0
>            Reporter: Dennis Yung
>            Assignee: Dennis Yung
>            Priority: P2
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> When using KafkaIO with ConfluentSchemaRegistryDeserializerProvider, 
> exception could be thrown when consuming a topic with evolved schema.
> It is because when the DeserializerProvider is initialized, it create a 
> AvroCoder instance using either the latest Avro schema by default, or a 
> specific version of provided.
> If the Kafka topic contains records with multiple schema versions, AvroCoder 
> will fail to encode records with different schemas. The specific exception 
> differs depending on the schema change. For example, I have encountered type 
> cast error and null pointer error. 
> To fix this issue, we can make use of the writer-reader schema arguments from 
> Avro to deserialize Kafka records to the same schema with the AvroCoder. The 
> method is available in io.confluent.kafka.serializers.KafkaAvroDeserializer
> {code:java}
>     public Object deserialize(String s, byte[] bytes, Schema readerSchema) {
>         return this.deserialize(bytes, readerSchema);
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to