[
https://issues.apache.org/jira/browse/FLINK-28467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jaya Ananthram updated FLINK-28467:
-----------------------------------
Summary: Flink Table API Avro format fails during schema evolution (was:
AVRO Table API fails during schema evolution)
> Flink Table API Avro format fails during schema evolution
> ---------------------------------------------------------
>
> Key: FLINK-28467
> URL: https://issues.apache.org/jira/browse/FLINK-28467
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.14.4
> Reporter: Jaya Ananthram
> Priority: Major
> Attachments: image-2022-07-08-15-01-49-648.png
>
>
> It looks like the Flink Table API fails during the schema evolution. ie -
> when we add a new optional field in the producer side, the consumer (Flink
> table API) fails to parse the message with the old schema. Following are the
> exact scenario to reproduce,
> # Create a schema X with two field
> # Send five messages to Kafka using schema X
> # Update the schema X to add one new optional field with default NULL (at
> last position)
> # Send five messages to Kafka using schema Y
> # Create a Flink SQL job to consume all the 10 messages using schema X (with
> two fields)
> # Exactly it will fail at the 7th message to get the exception *_Malformed
> data. Length is negative: -xx_* (the 6th message will pass successfully
> though).
> The complete stack trace is available below,
> {code:java}
> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is
> negative: -56 at
> org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:285) at
> org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
> at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
> at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
> at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
> ... 19 more {code}
>
> From [Avro specification|https://avro.apache.org/docs/1.10.0/spec.html]
> (refer to the section "Single object Encoding" or the attached image), it
> looks by default Avro provides the schema evolution support, so, as an
> end-user I expect the Flink table API to provide the same functionalities. I
> can also confirm that the old schema is able to decode all the 10 messages
> outside of Flink (ie - using simple hello world AvroDeserializer)
> I am adding the root cause as a comment, as I am exactly sure whether my
> finding is correct.
> *Note:* I am marking this as a Major ticket as we have another open ticket
> ([here|https://issues.apache.org/jira/browse/FLINK-20091]) which lacks the
> functionalities to ignore failures. This means that, even if the user is
> ready to miss some message (when a batch contains two types of messages),
> still we can't specify the property to ignore it (to update the DDL once it
> crosses the switch over messages). So it looks like the Avro table API can't
> be used in PROD when we expect to change the schema. I assume most of the
> cases, we are expected to change schema. If the severity is not correct
> please feel free to reduce the priority.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)