Jaya Ananthram created FLINK-28467:
--------------------------------------
Summary: AVRO Table API fails during schema evolution
Key: FLINK-28467
URL: https://issues.apache.org/jira/browse/FLINK-28467
Project: Flink
Issue Type: Bug
Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.14.4
Reporter: Jaya Ananthram
Attachments: image-2022-07-08-15-01-49-648.png
It looks like the Flink Table API fails during the schema evolution. ie - when
we add a new optional field in the producer side, the consumer (Flink table
API) fails to parse the message with the old schema. Following are the exact
scenario to reproduce,
# Create a schema X with two field
# Send five messages to Kafka using schema X
# Update the schema X to add one new optional field with default NULL (at last
position)
# Send five messages to Kafka using schema Y
# Create a Flink SQL job to consume all the 10 messages using schema X (with
two fields)
# Exactly it will fail at the 7th message to get the exception *_Malformed
data. Length is negative: -xx_* (the 6th message will pass successfully though).
The complete stack trace is available below,
{code:java}
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is
negative: -56 at
org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:285) at
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208) at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
... 19 more {code}
>From [Avro specification|https://avro.apache.org/docs/1.10.0/spec.html] (refer
>to the section "Single object Encoding" or the attached image), it looks by
>default Avro provides the schema evolution support, so, as an end-user I
>expect the Flink table API to provide the same functionalities. I can also
>confirm that the old schema is able to decode all the 10 messages outside of
>Flink (ie - using simple hello world AvroDeserializer)
I am adding the root cause as a comment, as I am exactly sure whether my
finding is correct.
*Note:* I am marking this as a Major ticket as we have another open ticket
([here|https://issues.apache.org/jira/browse/FLINK-20091]) which lacks the
functionalities to ignore failures. This means that, even if the user is ready
to miss some message (when a batch contains two types of messages), still we
can't specify the property to ignore it (to update the DDL once it crosses the
switch over messages). So it looks like the Avro table API can't be used in
PROD when we expect to change the schema. I assume most of the cases, we are
expected to change schema. If the severity is not correct please feel free to
reduce the priority.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)