[ 
https://issues.apache.org/jira/browse/FLINK-18049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121030#comment-17121030
 ] 

Zheng Hu commented on FLINK-18049:
----------------------------------

After digging into the code,  we found that the AvroRowDeserializationSchema 
have the following bugs: 
1>   It depends on the MutableByteArrayInputStream , which extends from 
ByteArrayInputStream. which the ByteArrayInputStream have 4 members : buf[], 
pos, mark, count, the  MutableByteArrayInputStream#setBuffer would reset all 
the members except the mark. That says if we call some methods in the  avro 
decoder,  then the internal mark will be messed up, which leads us reading the 
unexpected byte array..  although it's a bug but seems not the cause for this 
case. 
2>  We also found that avro's BinaryDecoder will maintain some buf[] inside it 
class.   Say we have two rows:   row_0 = (a, b),  row_1 = (a,b,c), row_2 = 
(a,b,d).    our flink job will use the old schema with two columns to read the 
rows,  when reading the row_1=(a,b,c),  the AvroRowDeserializationSchema would 
only scan the bytes of (a,b) to the upstream and left the bytes of (c) inside 
its internal buf[].   when the row_2=(a,b,d) came,  we actually are reading the 
byte[] filled with (c,a,b,d),  then we are trying to reading the (c,a) to the 
upstream ,  which introduced the stack traces finally. 

FYI [~twalthr].

> The Flink kafka consumer job will be interrupted if the upstream kafka 
> producer change the AVRO schema
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18049
>                 URL: https://issues.apache.org/jira/browse/FLINK-18049
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Zheng Hu
>            Priority: Critical
>
> We have encountered a critical case from online services.  we have the data 
> pipeline:  (producer) -> (kafka) -> (flink consumer job), and all those 
> records are encoded in AVRO format.  Once the producer changed the AVRO 
> schema , says adding an extra column to the existing schema and writing few 
> data into the Kafka. 
> Then the downstream flink job crashed with the following stacktrace: 
> {code}
> ==WARNING==  allocating large 
> array--thread_id[0x00007fccd9c16800]--thread_name[Source: Custom Source 
> (1/1)]--array_size[1590681120 bytes]--array_length[1590681103 elememts]
> os_prio=0 tid=0x00007fccd9c16800 nid=0x226c0 runnable 
>   at org.shaded.apache.avro.util.Utf8.setByteLength(Utf8.java:78)
>   at 
> org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:261)
>   at 
> org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272)
>   at 
> org.shaded.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:214)
>   at 
> org.shaded.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:412)
>   at 
> org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
>   at 
> org.shaded.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
>   at 
> org.shaded.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
>   at 
> org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
>   at 
> org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at 
> org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
>   at 
> org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
>   at 
> org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78)
>   at 
> org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:192)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:771)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:120)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:74)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:129)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:398)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:736)
>   at java.lang.Thread.run(Thread.java:834)
> {code} 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to