Jaya Ananthram created FLINK-28467: -------------------------------------- Summary: AVRO Table API fails during schema evolution Key: FLINK-28467 URL: https://issues.apache.org/jira/browse/FLINK-28467 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.14.4 Reporter: Jaya Ananthram Attachments: image-2022-07-08-15-01-49-648.png
It looks like the Flink Table API fails during the schema evolution. ie - when we add a new optional field in the producer side, the consumer (Flink table API) fails to parse the message with the old schema. Following are the exact scenario to reproduce, # Create a schema X with two field # Send five messages to Kafka using schema X # Update the schema X to add one new optional field with default NULL (at last position) # Send five messages to Kafka using schema Y # Create a Flink SQL job to consume all the 10 messages using schema X (with two fields) # Exactly it will fail at the 7th message to get the exception *_Malformed data. Length is negative: -xx_* (the 6th message will pass successfully though). The complete stack trace is available below, {code:java} Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -56 at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:285) at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) ... 19 more {code} >From [Avro specification|https://avro.apache.org/docs/1.10.0/spec.html] (refer >to the section "Single object Encoding" or the attached image), it looks by >default Avro provides the schema evolution support, so, as an end-user I >expect the Flink table API to provide the same functionalities. I can also >confirm that the old schema is able to decode all the 10 messages outside of >Flink (ie - using simple hello world AvroDeserializer) I am adding the root cause as a comment, as I am exactly sure whether my finding is correct. *Note:* I am marking this as a Major ticket as we have another open ticket ([here|https://issues.apache.org/jira/browse/FLINK-20091]) which lacks the functionalities to ignore failures. This means that, even if the user is ready to miss some message (when a batch contains two types of messages), still we can't specify the property to ignore it (to update the DDL once it crosses the switch over messages). So it looks like the Avro table API can't be used in PROD when we expect to change the schema. I assume most of the cases, we are expected to change schema. If the severity is not correct please feel free to reduce the priority. -- This message was sent by Atlassian Jira (v8.20.10#820010)