Jaya Ananthram created FLINK-28467:
--------------------------------------

             Summary: AVRO Table API fails during schema evolution
                 Key: FLINK-28467
                 URL: https://issues.apache.org/jira/browse/FLINK-28467
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.14.4
            Reporter: Jaya Ananthram
         Attachments: image-2022-07-08-15-01-49-648.png

It looks like the Flink Table API fails during the schema evolution. ie - when 
we add a new optional field in the producer side, the consumer (Flink table 
API) fails to parse the message with the old schema. Following are the exact 
scenario to reproduce,
 # Create a schema X with two field
 # Send five messages to Kafka using schema X
 # Update the schema X to add one new optional field with default NULL (at last 
position)
 # Send five messages to Kafka using schema Y
 # Create a Flink SQL job to consume all the 10 messages using schema X (with 
two fields)
 # Exactly it will fail at the 7th message to get the exception *_Malformed 
data. Length is negative: -xx_* (the 6th message will pass successfully though).

The complete stack trace is available below,
{code:java}
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is 
negative: -56        at 
org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:285)  at 
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)    at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
   at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
   at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
    at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
   at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at 
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
      at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
        ... 19 more {code}
 

>From [Avro specification|https://avro.apache.org/docs/1.10.0/spec.html] (refer 
>to the section "Single object Encoding" or the attached image), it looks by 
>default Avro provides the schema evolution support, so, as an end-user I 
>expect the Flink table API to provide the same functionalities. I can also 
>confirm that the old schema is able to decode all the 10 messages outside of 
>Flink (ie - using simple hello world AvroDeserializer)

I am adding the root cause as a comment, as I am exactly sure whether my 
finding is correct.

*Note:* I am marking this as a Major ticket as we have another open ticket 
([here|https://issues.apache.org/jira/browse/FLINK-20091]) which lacks the 
functionalities to  ignore failures. This means that, even if the user is ready 
to miss some message (when a batch contains two types of messages), still we 
can't specify the property to ignore it (to update the DDL once it crosses the 
switch over messages). So it looks like the Avro table API can't be used in 
PROD when we expect to change the schema. I assume most of the cases, we are 
expected to change schema. If the severity is not correct please feel free to 
reduce the priority.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to