[
https://issues.apache.org/jira/browse/FLINK-28467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jaya Ananthram updated FLINK-28467:
-----------------------------------
Description:
It looks like the Flink Table API fails during the schema evolution. ie - when
we add a new optional field in the producer side, the consumer (Flink table
API) fails to parse the message with the old schema. Following are the exact
scenario to reproduce,
# Create a schema X with two field
# Send five messages to Kafka using schema X
# Update the schema X to add one new optional field with default NULL (at last
position)
# Send five messages to Kafka using schema Y
# Create a Flink SQL job to consume all the 10 messages using schema X (with
two fields)
# Exactly it will fail at the 7th message to get the exception *_Malformed
data. Length is negative: -xx_* (the 6th message will pass successfully though).
The complete stack trace is available below,
{code:java}
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is
negative: -56 at
org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:285) at
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208) at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
... 19 more {code}
>From [Avro specification|https://avro.apache.org/docs/1.10.0/spec.html] (refer
>to the section "Single object Encoding" or the attached image), it looks by
>default Avro provides the schema evolution support, so, as an end-user I
>expect the Flink table API to provide the same functionalities. I can also
>confirm that the old schema is able to decode all the 10 messages outside of
>Flink (ie - using simple hello world AvroDeserializer)
I am adding the root cause as a comment, as I am not exactly sure whether my
finding is correct.
*Note:* I am marking this as a Major ticket as we have another open ticket
([here|https://issues.apache.org/jira/browse/FLINK-20091]) which lacks the
functionalities to ignore failures. This means that, even if the user is ready
to miss some message (when a batch contains two types of messages), still we
can't specify the property to ignore it (to update the DDL once it crosses the
switch over messages). So it looks like the Avro table API can't be used in
PROD when we expect to change the schema. I assume most of the cases, we are
expected to change schema. If the severity is not correct please feel free to
reduce the priority.
was:
It looks like the Flink Table API fails during the schema evolution. ie - when
we add a new optional field in the producer side, the consumer (Flink table
API) fails to parse the message with the old schema. Following are the exact
scenario to reproduce,
# Create a schema X with two field
# Send five messages to Kafka using schema X
# Update the schema X to add one new optional field with default NULL (at last
position)
# Send five messages to Kafka using schema Y
# Create a Flink SQL job to consume all the 10 messages using schema X (with
two fields)
# Exactly it will fail at the 7th message to get the exception *_Malformed
data. Length is negative: -xx_* (the 6th message will pass successfully though).
The complete stack trace is available below,
{code:java}
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is
negative: -56 at
org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:285) at
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208) at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
... 19 more {code}
>From [Avro specification|https://avro.apache.org/docs/1.10.0/spec.html] (refer
>to the section "Single object Encoding" or the attached image), it looks by
>default Avro provides the schema evolution support, so, as an end-user I
>expect the Flink table API to provide the same functionalities. I can also
>confirm that the old schema is able to decode all the 10 messages outside of
>Flink (ie - using simple hello world AvroDeserializer)
I am adding the root cause as a comment, as I am not exactly sure whether my
finding is correct.
*Note:* I am marking this as a Major ticket as we have another open ticket
([here|http://example.com]https://issues.apache.org/jira/browse/FLINK-20091)
which lacks the functionalities to ignore failures. This means that, even if
the user is ready to miss some message (when a batch contains two types of
messages), still we can't specify the property to ignore it (to update the DDL
once it crosses the switch over messages). So it looks like the Avro table API
can't be used in PROD when we expect to change the schema. I assume most of the
cases, we are expected to change schema. If the severity is not correct please
feel free to reduce the priority.
> Flink Table API Avro format fails during schema evolution
> ---------------------------------------------------------
>
> Key: FLINK-28467
> URL: https://issues.apache.org/jira/browse/FLINK-28467
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.14.4
> Reporter: Jaya Ananthram
> Priority: Major
> Attachments: image-2022-07-08-15-01-49-648.png
>
>
> It looks like the Flink Table API fails during the schema evolution. ie -
> when we add a new optional field in the producer side, the consumer (Flink
> table API) fails to parse the message with the old schema. Following are the
> exact scenario to reproduce,
> # Create a schema X with two field
> # Send five messages to Kafka using schema X
> # Update the schema X to add one new optional field with default NULL (at
> last position)
> # Send five messages to Kafka using schema Y
> # Create a Flink SQL job to consume all the 10 messages using schema X (with
> two fields)
> # Exactly it will fail at the 7th message to get the exception *_Malformed
> data. Length is negative: -xx_* (the 6th message will pass successfully
> though).
> The complete stack trace is available below,
> {code:java}
> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is
> negative: -56 at
> org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:285) at
> org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
> at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
> at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
> at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
> ... 19 more {code}
>
> From [Avro specification|https://avro.apache.org/docs/1.10.0/spec.html]
> (refer to the section "Single object Encoding" or the attached image), it
> looks by default Avro provides the schema evolution support, so, as an
> end-user I expect the Flink table API to provide the same functionalities. I
> can also confirm that the old schema is able to decode all the 10 messages
> outside of Flink (ie - using simple hello world AvroDeserializer)
> I am adding the root cause as a comment, as I am not exactly sure whether my
> finding is correct.
> *Note:* I am marking this as a Major ticket as we have another open ticket
> ([here|https://issues.apache.org/jira/browse/FLINK-20091]) which lacks the
> functionalities to ignore failures. This means that, even if the user is
> ready to miss some message (when a batch contains two types of messages),
> still we can't specify the property to ignore it (to update the DDL once it
> crosses the switch over messages). So it looks like the Avro table API can't
> be used in PROD when we expect to change the schema. I assume most of the
> cases, we are expected to change schema. If the severity is not correct
> please feel free to reduce the priority.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)