[ 
https://issues.apache.org/jira/browse/FLINK-32008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17726311#comment-17726311
 ] 

Ryan Skraba commented on FLINK-32008:
-------------------------------------

Hello, thanks for the example project!  That's so helpful to reproduce and 
debug.

The *current* file strategy for protobuf in Flink is to write one record 
serialized as binary per line, adding a *{{0x0a}}*.  Your example message is 
serialized as:

{code}
12 06 0a 01 61 12 01 62 0a
{code}

The first *{{0a}}* is the protobuf encoding for the key field in the map.  The 
last *{{0a}}* is a new line (which probably shouldn't be there).

When reading, from a file, splits are calculated and assigned to tasks using 
the *{{0a}}* as a delimiter, which is very, very likely to fail and a fault in 
the protobuf file implementation of Flink.

I'm guessing this isn't limited to maps, we can expect this delimiter byte to 
occur many different ways in the protobuf binary.

If this hasn't been addressed, it's probably because it's pretty rare to store 
protobuf messages in a file container (as opposed to in a single message 
packet, or a table cell).  Do you have a good use case that we can use to guide 
what we expect Flink to do with protobuf files?

For info, nothing in the Protobuf encoding that can be used to [distinguish the 
start or end|https://protobuf.dev/programming-guides/techniques/#streaming] of 
a message.  If we want to store multiple messages in the same container (file 
or any sequence of bytes), we have to manage the indices ourselves  The above 
link recommends writing the message size followed by the binary (in Java, this 
is using {{writeDelimitedTo}}/{{parseDelimitedFrom}} instead of 
{{writeTo}}/{{parseFrom}}, for example).

> Protobuf format throws exception with Map datatype
> --------------------------------------------------
>
>                 Key: FLINK-32008
>                 URL: https://issues.apache.org/jira/browse/FLINK-32008
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.17.0
>            Reporter: Xuannan Su
>            Priority: Major
>         Attachments: flink-protobuf-example.zip
>
>
> The protobuf format throws exception when working with Map data type. I 
> uploaded a example project to reproduce the problem.
>  
> {code:java}
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
> exception
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:261)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
>     at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:131)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     ... 1 more
> Caused by: java.io.IOException: Failed to deserialize PB object.
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:75)
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:42)
>     at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.readRecord(DeserializationSchemaAdapter.java:197)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$LineBytesInputFormat.nextRecord(DeserializationSchemaAdapter.java:210)
>     at 
> org.apache.flink.connector.file.table.DeserializationSchemaAdapter$Reader.readBatch(DeserializationSchemaAdapter.java:124)
>     at 
> org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator$1.readBatch(RecordMapperWrapperRecordIterator.java:82)
>     at 
> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
>     ... 6 more
> Caused by: java.lang.reflect.InvocationTargetException
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.convertProtoBinaryToRow(ProtoToRowConverter.java:129)
>     at 
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.deserialize(PbRowDataDeserializationSchema.java:70)
>     ... 15 more
> Caused by: com.google.protobuf.InvalidProtocolBufferException: While parsing 
> a protocol message, the input ended unexpectedly in the middle of a field.  
> This could mean either that the input has been truncated or that an embedded 
> message misreported its own length.
>     at 
> com.google.protobuf.InvalidProtocolBufferException.truncatedMessage(InvalidProtocolBufferException.java:115)
>     at 
> com.google.protobuf.CodedInputStream$ArrayDecoder.pushLimit(CodedInputStream.java:1196)
>     at 
> com.google.protobuf.CodedInputStream$ArrayDecoder.readMessage(CodedInputStream.java:887)
>     at com.example.proto.MapMessage.<init>(MapMessage.java:64)
>     at com.example.proto.MapMessage.<init>(MapMessage.java:9)
>     at com.example.proto.MapMessage$1.parsePartialFrom(MapMessage.java:756)
>     at com.example.proto.MapMessage$1.parsePartialFrom(MapMessage.java:750)
>     at 
> com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158)
>     at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191)
>     at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:203)
>     at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:208)
>     at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
>     at com.example.proto.MapMessage.parseFrom(MapMessage.java:320)
>     ... 21 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to