[
https://issues.apache.org/jira/browse/FLINK-18049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121030#comment-17121030
]
Zheng Hu commented on FLINK-18049:
----------------------------------
After digging into the code, we found that the AvroRowDeserializationSchema
have the following bugs:
1> It depends on the MutableByteArrayInputStream , which extends from
ByteArrayInputStream. which the ByteArrayInputStream have 4 members : buf[],
pos, mark, count, the MutableByteArrayInputStream#setBuffer would reset all
the members except the mark. That says if we call some methods in the avro
decoder, then the internal mark will be messed up, which leads us reading the
unexpected byte array.. although it's a bug but seems not the cause for this
case.
2> We also found that avro's BinaryDecoder will maintain some buf[] inside it
class. Say we have two rows: row_0 = (a, b), row_1 = (a,b,c), row_2 =
(a,b,d). our flink job will use the old schema with two columns to read the
rows, when reading the row_1=(a,b,c), the AvroRowDeserializationSchema would
only scan the bytes of (a,b) to the upstream and left the bytes of (c) inside
its internal buf[]. when the row_2=(a,b,d) came, we actually are reading the
byte[] filled with (c,a,b,d), then we are trying to reading the (c,a) to the
upstream , which introduced the stack traces finally.
FYI [~twalthr].
> The Flink kafka consumer job will be interrupted if the upstream kafka
> producer change the AVRO schema
> ------------------------------------------------------------------------------------------------------
>
> Key: FLINK-18049
> URL: https://issues.apache.org/jira/browse/FLINK-18049
> Project: Flink
> Issue Type: Bug
> Reporter: Zheng Hu
> Priority: Critical
>
> We have encountered a critical case from online services. we have the data
> pipeline: (producer) -> (kafka) -> (flink consumer job), and all those
> records are encoded in AVRO format. Once the producer changed the AVRO
> schema , says adding an extra column to the existing schema and writing few
> data into the Kafka.
> Then the downstream flink job crashed with the following stacktrace:
> {code}
> ==WARNING== allocating large
> array--thread_id[0x00007fccd9c16800]--thread_name[Source: Custom Source
> (1/1)]--array_size[1590681120 bytes]--array_length[1590681103 elememts]
> os_prio=0 tid=0x00007fccd9c16800 nid=0x226c0 runnable
> at org.shaded.apache.avro.util.Utf8.setByteLength(Utf8.java:78)
> at
> org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:261)
> at
> org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272)
> at
> org.shaded.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:214)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:412)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
> at
> org.shaded.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at
> org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
> at
> org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78)
> at
> org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:192)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:771)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:120)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:74)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:129)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:398)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:736)
> at java.lang.Thread.run(Thread.java:834)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)