[ 
https://issues.apache.org/jira/browse/FLINK-28467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jaya Ananthram updated FLINK-28467:
-----------------------------------
    Summary: Flink Table API Avro format fails during schema evolution  (was: 
AVRO Table API fails during schema evolution)

> Flink Table API Avro format fails during schema evolution
> ---------------------------------------------------------
>
>                 Key: FLINK-28467
>                 URL: https://issues.apache.org/jira/browse/FLINK-28467
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.14.4
>            Reporter: Jaya Ananthram
>            Priority: Major
>         Attachments: image-2022-07-08-15-01-49-648.png
>
>
> It looks like the Flink Table API fails during the schema evolution. ie - 
> when we add a new optional field in the producer side, the consumer (Flink 
> table API) fails to parse the message with the old schema. Following are the 
> exact scenario to reproduce,
>  # Create a schema X with two field
>  # Send five messages to Kafka using schema X
>  # Update the schema X to add one new optional field with default NULL (at 
> last position)
>  # Send five messages to Kafka using schema Y
>  # Create a Flink SQL job to consume all the 10 messages using schema X (with 
> two fields)
>  # Exactly it will fail at the 7th message to get the exception *_Malformed 
> data. Length is negative: -xx_* (the 6th message will pass successfully 
> though).
> The complete stack trace is available below,
> {code:java}
> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is 
> negative: -56      at 
> org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:285)  at 
> org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)    
> at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
>    at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
>    at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) 
> at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>     at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>    at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) 
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) 
> at 
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
>       at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>         ... 19 more {code}
>  
> From [Avro specification|https://avro.apache.org/docs/1.10.0/spec.html] 
> (refer to the section "Single object Encoding" or the attached image), it 
> looks by default Avro provides the schema evolution support, so, as an 
> end-user I expect the Flink table API to provide the same functionalities. I 
> can also confirm that the old schema is able to decode all the 10 messages 
> outside of Flink (ie - using simple hello world AvroDeserializer)
> I am adding the root cause as a comment, as I am exactly sure whether my 
> finding is correct.
> *Note:* I am marking this as a Major ticket as we have another open ticket 
> ([here|https://issues.apache.org/jira/browse/FLINK-20091]) which lacks the 
> functionalities to  ignore failures. This means that, even if the user is 
> ready to miss some message (when a batch contains two types of messages), 
> still we can't specify the property to ignore it (to update the DDL once it 
> crosses the switch over messages). So it looks like the Avro table API can't 
> be used in PROD when we expect to change the schema. I assume most of the 
> cases, we are expected to change schema. If the severity is not correct 
> please feel free to reduce the priority.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to