[ 
https://issues.apache.org/jira/browse/FLINK-18049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123909#comment-17123909
 ] 

Dawid Wysakowicz edited comment on FLINK-18049 at 6/2/20, 3:07 PM:
-------------------------------------------------------------------

The Flink's documentation refers to "Avro type *state*". We can guarantee that 
because we store the writer schema in the checkpoint. So if we restore with a 
new schema we have both the old schema and the new schema.

>From avro docs:
> A reader of Avro data, whether from an RPC or a file, can always parse that 
> data because the original schema *must be provided along with the data*

The part you referenced describes what happens with the value from that field 
of the original schema. It still assumes we have the original schema for the 
record. This part does not talk about fields at the end of the record, so it 
certainly does not describe your case. Just note that every single point in 
that section you linked mentions two schemas, which again must be present when 
deserializing. 



was (Author: dawidwys):
The Flink's documentation refers to "Avro type *state*". We can guarantee that 
because we store the writer schema in the checkpoint. So if we restore with a 
new schema we have both the old schema and the new schema.

>From avro docs:
> A reader of Avro data, whether from an RPC or a file, can always parse that 
> data because the original schema *must be provided along with the data*

The part you referenced describes what happens with the value from that field 
of the original schema. It still assumes we have the original schema for the 
record. This part does not talk about fields at the end of the record, so it 
certainly does not describe your case. Your solution works exclusively for 
appended fields.

> The Flink kafka consumer job will be interrupted if the upstream kafka 
> producer change the AVRO schema
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18049
>                 URL: https://issues.apache.org/jira/browse/FLINK-18049
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Zheng Hu
>            Priority: Critical
>              Labels: pull-request-available
>
> We have encountered a critical case from online services.  we have the data 
> pipeline:  (producer) -> (kafka) -> (flink consumer job), and all those 
> records are encoded in AVRO format.  Once the producer changed the AVRO 
> schema , says adding an extra column to the existing schema and writing few 
> data into the Kafka. 
> Then the downstream flink job crashed with the following stacktrace: 
> {code}
> ==WARNING==  allocating large 
> array--thread_id[0x00007fccd9c16800]--thread_name[Source: Custom Source 
> (1/1)]--array_size[1590681120 bytes]--array_length[1590681103 elememts]
> os_prio=0 tid=0x00007fccd9c16800 nid=0x226c0 runnable 
>   at org.shaded.apache.avro.util.Utf8.setByteLength(Utf8.java:78)
>   at 
> org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:261)
>   at 
> org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272)
>   at 
> org.shaded.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:214)
>   at 
> org.shaded.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:412)
>   at 
> org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
>   at 
> org.shaded.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>   at 
> org.shaded.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>   at 
> org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>   at 
> org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at 
> org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>   at 
> org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
>   at 
> org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78)
>   at 
> org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:192)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:771)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:120)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:74)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:129)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:398)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:736)
>   at java.lang.Thread.run(Thread.java:834)
> {code} 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to