[
https://issues.apache.org/jira/browse/FLINK-32008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726311#comment-17726311
]
Ryan Skraba commented on FLINK-32008:
-------------------------------------
Hello, thanks for the example project! That's so helpful to reproduce and
debug.
The *current* file strategy for protobuf in Flink is to write one record
serialized as binary per line, adding a *{{0x0a}}*. Your example message is
serialized as:
{code}
12 06 0a 01 61 12 01 62 0a
{code}
The first *{{0a}}* is the protobuf encoding for the key field in the map. The
last *{{0a}}* is a new line (which probably shouldn't be there).
When reading, from a file, splits are calculated and assigned to tasks using
the *{{0a}}* as a delimiter, which is very, very likely to fail and a fault in
the protobuf file implementation of Flink.
I'm guessing this isn't limited to maps, we can expect this delimiter byte to
occur many different ways in the protobuf binary.
If this hasn't been addressed, it's probably because it's pretty rare to store
protobuf messages in a file container (as opposed to in a single message
packet, or a table cell). Do you have a good use case that we can use to guide
what we expect Flink to do with protobuf files?
For info, nothing in the Protobuf encoding that can be used to [distinguish the
start or end|https://protobuf.dev/programming-guides/techniques/#streaming] of
a message. If we want to store multiple messages in the same container (file
or any sequence of bytes), we have to manage the indices ourselves The above
link recommends writing the message size followed by the binary (in Java, this
is using {{writeDelimitedTo}}/{{parseDelimitedFrom}} instead of
{{writeTo}}/{{parseFrom}}, for example).
> Protobuf format throws exception with Map datatype
> --------------------------------------------------
>
> Key: FLINK-32008
> URL: https://issues.apache.org/jira/browse/FLINK-32008
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.17.0
> Reporter: Xuannan Su
> Priority: Major
> Attachments: flink-protobuf-example.zip
>
>
> The protobuf format throws exception when working with Map data type. I
> uploaded a example project to reproduce the problem.
>
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered
> exception
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
> unexpected exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> Caused by: java.io.IOException: Failed to deserialize PB object.
> at
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75)
> at
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42)
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197)
> at
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210)
> at
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124)
> at
> org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82)
> at
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
> ... 6 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.convertProtoBinaryToRow(ProtoToRowConverter.java:129)
> at
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:70)
> ... 15 more
> Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing
> a protocol message, the input ended unexpectedly in the middle of a field.
> This could mean either that the input has been truncated or that an embedded
> message misreported its own length.
> at
> com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:115)
> at
> com.google.protobuf.CodedInputStream$ArrayDecoder.pushLimit(CodedInputStream.java:1196)
> at
> com.google.protobuf.CodedInputStream$ArrayDecoder.readMessage(CodedInputStream.java:887)
> at com.example.proto.MapMessage.<init>(MapMessage.java:64)
> at com.example.proto.MapMessage.<init>(MapMessage.java:9)
> at com.example.proto.MapMessage$1.parsePartialFrom(MapMessage.java:756)
> at com.example.proto.MapMessage$1.parsePartialFrom(MapMessage.java:750)
> at
> com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158)
> at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191)
> at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:203)
> at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:208)
> at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
> at com.example.proto.MapMessage.parseFrom(MapMessage.java:320)
> ... 21 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)