[
https://issues.apache.org/jira/browse/FLINK-28467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17564268#comment-17564268
]
Jaya Ananthram edited comment on FLINK-28467 at 7/8/22 12:33 PM:
-----------------------------------------------------------------
*Root cause:* It looks like the byte buffer is not cleared properly when the
message contains a new field that is not in the schema. So, as a result, the
7th message contains a partial 6th message (technically the new field) + a
partial 7th message. This happens due to resuing the object
[binaryDecoder|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java#L165].
If I create a new binaryDecoder object for each message, then it works fine in
my local. Not 100% sure though, hence marking it as a comment.
*Note:* For the test case, we need to make sure to add at least two messages
parsed successfully during schema evolution as one message is parsed
successfully after a schema change.
was (Author: jaya.ananthram):
*Root cause:* It looks like the byte buffer is not cleared properly when the
message contains a new field that is not in the schema. So, as a result, the
7th message contains a partial 6th message (technically the new field) + a
partial 7th message. This happens due to resuing the object
[binaryDecoder|https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java#L165].
If I create a new binaryDecoder object for each message, then it works fine in
my local. Not 100% sure though, hence marking it as a comment.
*Note:* For the test case, we need to make sure to add at least two messages
parsed successfully during schema evolution.
> AVRO Table API fails during schema evolution
> --------------------------------------------
>
> Key: FLINK-28467
> URL: https://issues.apache.org/jira/browse/FLINK-28467
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.14.4
> Reporter: Jaya Ananthram
> Priority: Major
> Attachments: image-2022-07-08-15-01-49-648.png
>
>
> It looks like the Flink Table API fails during the schema evolution. ie -
> when we add a new optional field in the producer side, the consumer (Flink
> table API) fails to parse the message with the old schema. Following are the
> exact scenario to reproduce,
> # Create a schema X with two field
> # Send five messages to Kafka using schema X
> # Update the schema X to add one new optional field with default NULL (at
> last position)
> # Send five messages to Kafka using schema Y
> # Create a Flink SQL job to consume all the 10 messages using schema X (with
> two fields)
> # Exactly it will fail at the 7th message to get the exception *_Malformed
> data. Length is negative: -xx_* (the 6th message will pass successfully
> though).
> The complete stack trace is available below,
> {code:java}
> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is
> negative: -56 at
> org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:285) at
> org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
> at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
> at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
> at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
> ... 19 more {code}
>
> From [Avro specification|https://avro.apache.org/docs/1.10.0/spec.html]
> (refer to the section "Single object Encoding" or the attached image), it
> looks by default Avro provides the schema evolution support, so, as an
> end-user I expect the Flink table API to provide the same functionalities. I
> can also confirm that the old schema is able to decode all the 10 messages
> outside of Flink (ie - using simple hello world AvroDeserializer)
> I am adding the root cause as a comment, as I am exactly sure whether my
> finding is correct.
> *Note:* I am marking this as a Major ticket as we have another open ticket
> ([here|https://issues.apache.org/jira/browse/FLINK-20091]) which lacks the
> functionalities to ignore failures. This means that, even if the user is
> ready to miss some message (when a batch contains two types of messages),
> still we can't specify the property to ignore it (to update the DDL once it
> crosses the switch over messages). So it looks like the Avro table API can't
> be used in PROD when we expect to change the schema. I assume most of the
> cases, we are expected to change schema. If the severity is not correct
> please feel free to reduce the priority.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)