[
https://issues.apache.org/jira/browse/FLINK-18049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123939#comment-17123939
]
Zheng Hu commented on FLINK-18049:
----------------------------------
> The Flink's documentation refers to "Avro type state" ..
Got your point, I guess I missed the "state" and the above sections in the
post. It's about state compatible.
> This part does not talk about fields at the end of the record, o it certainly
> does not describe your case.
For my understanding, the section is answering the question whether the reader
could read record with a schema which is different with the writer one. For our
case, it's a data pipeline : (avro writer) -> (kafka) -> (avro reader in flink
side), I think it's an extensional case said in the document. ( Or say we
could support the avro compatibility without taking much effort).
> Your solution works exclusively for appended fields.
The unit test is an adding column case ( for verifying purpose ), but the
solution is not working exclusively for appended fields as your said. It could
also support other schema changes , such as adjusting the columns' order,
removing the column with default setting, changing the data type from bytes to
string etc.
Thanks.
> The Flink kafka consumer job will be interrupted if the upstream kafka
> producer change the AVRO schema
> ------------------------------------------------------------------------------------------------------
>
> Key: FLINK-18049
> URL: https://issues.apache.org/jira/browse/FLINK-18049
> Project: Flink
> Issue Type: Bug
> Reporter: Zheng Hu
> Priority: Critical
> Labels: pull-request-available
>
> We have encountered a critical case from online services. we have the data
> pipeline: (producer) -> (kafka) -> (flink consumer job), and all those
> records are encoded in AVRO format. Once the producer changed the AVRO
> schema , says adding an extra column to the existing schema and writing few
> data into the Kafka.
> Then the downstream flink job crashed with the following stacktrace:
> {code}
> ==WARNING== allocating large
> array--thread_id[0x00007fccd9c16800]--thread_name[Source: Custom Source
> (1/1)]--array_size[1590681120 bytes]--array_length[1590681103 elememts]
> os_prio=0 tid=0x00007fccd9c16800 nid=0x226c0 runnable
> at org.shaded.apache.avro.util.Utf8.setByteLength(Utf8.java:78)
> at
> org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:261)
> at
> org.shaded.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:272)
> at
> org.shaded.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:214)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:412)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
> at
> org.shaded.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
> at
> org.shaded.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
> at
> org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:167)
> at
> org.apache.flink.formats.avro.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:78)
> at
> org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:192)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:771)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:120)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:74)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:129)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:398)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:736)
> at java.lang.Thread.run(Thread.java:834)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)