[ 
https://issues.apache.org/jira/browse/FLINK-28467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jaya Ananthram updated FLINK-28467:
-----------------------------------
    Description: 
It looks like the Flink Table API fails during the schema evolution. ie - when 
we add a new optional field in the producer side, the consumer (Flink table 
API) fails to parse the message with the old schema. Following are the exact 
scenario to reproduce,
 # Create a schema X with two field
 # Send five messages to Kafka using schema X
 # Update the schema X to add one new optional field with default NULL (at last 
position)
 # Send five messages to Kafka using schema Y
 # Create a Flink SQL job to consume all the 10 messages using schema X (with 
two fields)
 # Exactly it will fail at the 7th message to get the exception *_Malformed 
data. Length is negative: -xx_* (the 6th message will pass successfully though).

The complete stack trace is available below,
{code:java}
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is 
negative: -56        at 
org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:285)  at 
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)    at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
   at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
   at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
    at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
   at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at 
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
      at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
        ... 19 more {code}
 

>From [Avro specification|https://avro.apache.org/docs/1.10.0/spec.html] (refer 
>to the section "Single object Encoding" or the attached image), it looks by 
>default Avro provides the schema evolution support, so, as an end-user I 
>expect the Flink table API to provide the same functionalities. I can also 
>confirm that the old schema is able to decode all the 10 messages outside of 
>Flink (ie - using simple hello world AvroDeserializer)

I am adding the root cause as a comment, as I am not exactly sure whether my 
finding is correct.

*Note:* I am marking this as a Major ticket as we have another open ticket 
([here|https://issues.apache.org/jira/browse/FLINK-20091]) which lacks the 
functionalities to  ignore failures. This means that, even if the user is ready 
to miss some message (when a batch contains two types of messages), still we 
can't specify the property to ignore it (to update the DDL once it crosses the 
switch over messages). So it looks like the Avro table API can't be used in 
PROD when we expect to change the schema. I assume most of the cases, we are 
expected to change schema. If the severity is not correct please feel free to 
reduce the priority.

  was:
It looks like the Flink Table API fails during the schema evolution. ie - when 
we add a new optional field in the producer side, the consumer (Flink table 
API) fails to parse the message with the old schema. Following are the exact 
scenario to reproduce,
 # Create a schema X with two field
 # Send five messages to Kafka using schema X
 # Update the schema X to add one new optional field with default NULL (at last 
position)
 # Send five messages to Kafka using schema Y
 # Create a Flink SQL job to consume all the 10 messages using schema X (with 
two fields)
 # Exactly it will fail at the 7th message to get the exception *_Malformed 
data. Length is negative: -xx_* (the 6th message will pass successfully though).

The complete stack trace is available below,
{code:java}
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is 
negative: -56        at 
org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:285)  at 
org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)    at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
   at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
   at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
    at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
   at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) at 
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
      at 
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
        ... 19 more {code}
 

>From [Avro specification|https://avro.apache.org/docs/1.10.0/spec.html] (refer 
>to the section "Single object Encoding" or the attached image), it looks by 
>default Avro provides the schema evolution support, so, as an end-user I 
>expect the Flink table API to provide the same functionalities. I can also 
>confirm that the old schema is able to decode all the 10 messages outside of 
>Flink (ie - using simple hello world AvroDeserializer)

I am adding the root cause as a comment, as I am not exactly sure whether my 
finding is correct.

*Note:* I am marking this as a Major ticket as we have another open ticket 
([here|http://example.com]https://issues.apache.org/jira/browse/FLINK-20091) 
which lacks the functionalities to  ignore failures. This means that, even if 
the user is ready to miss some message (when a batch contains two types of 
messages), still we can't specify the property to ignore it (to update the DDL 
once it crosses the switch over messages). So it looks like the Avro table API 
can't be used in PROD when we expect to change the schema. I assume most of the 
cases, we are expected to change schema. If the severity is not correct please 
feel free to reduce the priority.


> Flink Table API Avro format fails during schema evolution
> ---------------------------------------------------------
>
>                 Key: FLINK-28467
>                 URL: https://issues.apache.org/jira/browse/FLINK-28467
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.14.4
>            Reporter: Jaya Ananthram
>            Priority: Major
>         Attachments: image-2022-07-08-15-01-49-648.png
>
>
> It looks like the Flink Table API fails during the schema evolution. ie - 
> when we add a new optional field in the producer side, the consumer (Flink 
> table API) fails to parse the message with the old schema. Following are the 
> exact scenario to reproduce,
>  # Create a schema X with two field
>  # Send five messages to Kafka using schema X
>  # Update the schema X to add one new optional field with default NULL (at 
> last position)
>  # Send five messages to Kafka using schema Y
>  # Create a Flink SQL job to consume all the 10 messages using schema X (with 
> two fields)
>  # Exactly it will fail at the 7th message to get the exception *_Malformed 
> data. Length is negative: -xx_* (the 6th message will pass successfully 
> though).
> The complete stack trace is available below,
> {code:java}
> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is 
> negative: -56      at 
> org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:285)  at 
> org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)    
> at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
>    at 
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
>    at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) 
> at 
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>     at 
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>    at 
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>         at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) 
> at 
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) 
> at 
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
>       at 
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>         ... 19 more {code}
>  
> From [Avro specification|https://avro.apache.org/docs/1.10.0/spec.html] 
> (refer to the section "Single object Encoding" or the attached image), it 
> looks by default Avro provides the schema evolution support, so, as an 
> end-user I expect the Flink table API to provide the same functionalities. I 
> can also confirm that the old schema is able to decode all the 10 messages 
> outside of Flink (ie - using simple hello world AvroDeserializer)
> I am adding the root cause as a comment, as I am not exactly sure whether my 
> finding is correct.
> *Note:* I am marking this as a Major ticket as we have another open ticket 
> ([here|https://issues.apache.org/jira/browse/FLINK-20091]) which lacks the 
> functionalities to  ignore failures. This means that, even if the user is 
> ready to miss some message (when a batch contains two types of messages), 
> still we can't specify the property to ignore it (to update the DDL once it 
> crosses the switch over messages). So it looks like the Avro table API can't 
> be used in PROD when we expect to change the schema. I assume most of the 
> cases, we are expected to change schema. If the severity is not correct 
> please feel free to reduce the priority.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to