[
https://issues.apache.org/jira/browse/FLINK-18049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17125108#comment-17125108
]
Seth Wiesman commented on FLINK-18049:
--------------------------------------
I tend to agree with Dawid, this is not how Avro is supposed to be used and
this seems fragile at best. The proper solution would be adding better sql
support for registries.
> The Flink kafka consumer job will be interrupted if the upstream kafka
> producer change the AVRO schema
> ------------------------------------------------------------------------------------------------------
>
> Key: FLINK-18049
> URL: https://issues.apache.org/jira/browse/FLINK-18049
> Project: Flink
> Issue Type: Bug
> Reporter: Zheng Hu
> Priority: Critical
> Labels: pull-request-available
>
> We have encountered a critical case from online services. we have the data
> pipeline: (producer) -> (kafka) -> (flink consumer job), and all those
> records are encoded in AVRO format. Once the producer changed the AVRO
> schema , says adding an extra column to the existing schema and writing few
> data into the Kafka.
> Then the downstream flink job crashed with the following stacktrace:
> {code}
> ==WARNING== allocating large
> array--thread_id[0x00007fccd9c16800]--thread_name[Source: Custom Source
> (1/1)]--array_size[1590681120 bytes]--array_length[1590681103 elememts]
> os_prio=0 tid=0x00007fccd9c16800 nid=0x226c0 runnable
> at org.shaded.apache.avro.util.Utf8.setByteLength(Utf8.java:78)
> at
> org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:261)
> at
> org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272)
> at
> org.shaded.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:214)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:412)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
> at
> org.shaded.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at
> org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
> at
> org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78)
> at
> org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:192)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:771)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:120)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:74)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:129)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:398)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:736)
> at java.lang.Thread.run(Thread.java:834)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)