[
https://issues.apache.org/jira/browse/FLINK-34954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844507#comment-17844507
]
Rui Fan commented on FLINK-34954:
---------------------------------
Hi [~q.xu] , I wanna check with you did you meet this issue in your production
job? or it only happens for your test code.
After I read the flink related code in detail, I guess this issue never happen
for flink job.
h1. Reason:
This exception happens inside of NoFetchingInput.readBytes, and when
inputStream.read return -1.
h2. Why doesn't flink job throw exception?
In flink code, the inputStream is DataInputViewStream, check
KryoSerializer#deserialize method.
{code:java}
DataInputViewStream inputStream = new
DataInputViewStream(source);
input = new NoFetchingInput(inputStream);{code}
DataInputViewStream#read calls
org.apache.flink.core.memory.DataInputView#read(byte[], int, int). From its
comment, we can see : <p>If <code>len</code> is zero, then no bytes are read
and <code>0</code> is returned;
So when length is 0, inputStream.read will return 0. And
NoFetchingInput.readBytes won't throw exception.
h2. Why does your test demo throw exception?
Your demo is using java.io.ByteArrayInputStream as the inputStream. From its
comment, we can see
{code:java}
* @return the total number of bytes read into the buffer, or
* <code>-1</code> if there is no more data because the end of
* the stream has been reached. {code}
It returns -1 even if length is 0.
When return value is -1, NoFetchingInput.readBytes will throw exception.
Please correct me if anything is wrong, thanks~
> Kryo input implementation NoFetchingInput fails to handle zero length bytes
> ---------------------------------------------------------------------------
>
> Key: FLINK-34954
> URL: https://issues.apache.org/jira/browse/FLINK-34954
> Project: Flink
> Issue Type: Bug
> Components: API / Type Serialization System
> Affects Versions: 1.19.0
> Reporter: Qinghui Xu
> Assignee: Qinghui Xu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.20.0
>
>
> If the serailized bytes are empty, `NoFetchingInput` will run into error when
> Kryo tries to deserialize it.
> Example: a protobuf 3 object that contains only default values will be
> serialized as 0 length byte array, and the deserialization later will fail.
> Illustration:
> {noformat}
> import com.esotericsoftware.kryo.Kryo
> import com.esotericsoftware.kryo.io.{ByteBufferInput, ByteBufferOutput,
> Input, Output}
> import com.google.protobuf.{DescriptorProtos, Message}import
> com.twitter.chill.protobuf.ProtobufSerializer
> import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
> import java.io.ByteArrayInputStream
>
> object ProtoSerializationTest {
> def main(args: Array[String]) = {
> val chillProtoSerializer = new ProtobufSerializer
> val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance
> val output: Output = new ByteBufferOutput(1000)
> chillProtoSerializer.write(null, output, protomessage)
> val serialized: Array[Byte] = output.toBytes
> println(s"Serialized : $serialized")
> val input: Input = new NoFetchingInput(new
> ByteArrayInputStream(serialized))
> val deserialized = chillProtoSerializer.read(null, input,
> classOf[BillableClick].asInstanceOf[Class[Message]])
> println(deserialized)
> }
> }
> {noformat}
>
> Error
> {noformat}
> Exception in thread "main" java.lang.RuntimeException: Could not create class
> com.criteo.glup.BillableClickProto$BillableClick
> at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
> at
> com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22)
> at ProtoSerialization.main(ProtoSerialization.scala)
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No
> more bytes left.
> at
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
> at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
> at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
> ... 2 more
> Caused by: java.io.EOFException: No more bytes left.
> ... 5 more{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)