[ 
https://issues.apache.org/jira/browse/FLINK-24544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17431204#comment-17431204
 ] 

Peter Schrott edited comment on FLINK-24544 at 10/20/21, 1:36 PM:
------------------------------------------------------------------

The underlying problem with deserialization of records with enums form Kafka & 
schema registry lies in the initialization of {{GenericDatumReader}}:

*Case Kafka & SR:*

In {{AvroDeserializationSchema.java}} the {{GenericDatumReader}} is initialized 
with {{writerSchema = null}} and {{readerSchema = the schma gained from table 
ddl}}
 -> When calling {{RegistryAvroDeserializationSchema.deserialize(.)}} 
{{datumReader.setSchema()}} sets the attribute {{actual}} is set to the actual 
avro schema, whereas {{expected}} is already set to {{readerSchema}}
 -> The inequality of {{actual}} and {{expected}} causes the exception on 
serializing as type of {{actual}} and {{expected}} do not match

--> Root of this is: the initialization of {{DeserializationSchema}} in 
{{RegistryAvroFormatFactory.java}} uses the {{rowType}} && 
{{ConfluentRegistryAvroDeserializationSchema.forGeneric(.)}} when creating the 
{{ConfluentRegistryAvroDeserializationSchema}}

*Case FS:*

In {{AvroInputFormat.java}} the {{GenericDatumReader}} is initialized with 
{{writerSchema = null}} and {{readerSchema = null}}
 -> This leads in initialization of {{DataFileStream}}, where 
{{reader.getSchema(.)}} is called with the actual avro, to the fact that in the 
{{GenericDatumReader}} attribute {{expected}} and {{actual}} is set to the 
passed value
 -> The avro schema is taken from file
 -> The equality of {{actual}} and {{expected}} leads to the fact that 
serialized data can be read from file


was (Author: peter.schrott):
The underlying problem with deserialization of records with enums form Kafka & 
schema registry lies in the initialization of \{{GenericDatumReader}}:

Case Kafka & SR:

In \{{AvroDeserializationSchema.java}} the \{{GenericDatumReader}} is 
initialized with \{{writerSchema = null}} and \{{readerSchema = the schma 
gained from table ddl}}
-> When calling \{{RegistryAvroDeserializationSchema.deserialize(.)}} 
\{{datumReader.setSchema()}} sets the attribute \{{actual}} is set to the 
actual avro schema, whereas \{{expected}} is already set to \{{readerSchema}}
-> The inequality of \{{actual}} and \{{expected}} causes the exception on 
serializing as type of \{{actual}} and \{{expected}} do not match

--> Root of this is: the initialization of \{{DeserializationSchema}} in 
\{{RegistryAvroFormatFactory.java}} uses the \{{rowType}} && 
\{{ConfluentRegistryAvroDeserializationSchema.forGeneric(.)}} when creating the 
\{{ConfluentRegistryAvroDeserializationSchema}}

Case FS:

In \{{AvroInputFormat.java}} the \{{GenericDatumReader}} is initialized with 
\{{writerSchema = null}} and \{{readerSchema = null}}
-> This leads in initialization of \{{DataFileStream}}, where 
\{{reader.getSchema(.)}} is called with the actual avro, to the fact that in 
the \{{GenericDatumReader}} attribute \{{expected}} and \{{actual}} is set to 
the passed value
-> The avro schema is taken from file
-> The equality of \{{actual}} and \{{expected}} leads to the fact that 
serialized data can be read from file

> Failure when using Kafka connector in Table API with Avro and Confluent 
> schema registry 
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-24544
>                 URL: https://issues.apache.org/jira/browse/FLINK-24544
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, 
> SequenceFile), Table SQL / Ecosystem
>    Affects Versions: 1.13.1
>            Reporter: Francesco Guardiani
>            Priority: Major
>         Attachments: flink-deser-avro-enum.zip
>
>
> A user reported in the [mailing 
> list|https://lists.apache.org/thread.html/re38a07f6121cc580737a20c11574719cfe554e58d99817f79db9bb4a%40%3Cuser.flink.apache.org%3E]
>  that Avro deserialization fails when using Kafka, Avro and Confluent Schema 
> Registry:  
> {code:java}
> Caused by: java.io.IOException: Failed to deserialize Avro record.
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>   
>   at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>   at 
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>  
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> Caused by: org.apache.avro.AvroTypeException: Found my.type.avro.MyEnumType, 
> expecting union
>   at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>   at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at 
> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>   at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>   at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at 
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>   at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>   ... 9 more
> {code}
> Look in the attachments for a reproducer.
> Same data serialized to a file works fine (look the filesystem example in the 
> reproducer) 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to