[ 
https://issues.apache.org/jira/browse/FLINK-34954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17844507#comment-17844507
 ] 

Rui Fan commented on FLINK-34954:
---------------------------------

Hi [~q.xu] , I wanna check with you did you meet this issue in your production 
job? or it only happens for your test code.

After I read the flink related code in detail, I guess this issue never happen 
for flink job.
h1. Reason:

This exception happens inside of NoFetchingInput.readBytes, and when 
inputStream.read return -1.
h2. Why doesn't flink job throw exception?

In flink code, the inputStream is DataInputViewStream, check 
KryoSerializer#deserialize method.
{code:java}
               DataInputViewStream inputStream = new 
DataInputViewStream(source);
                input = new NoFetchingInput(inputStream);{code}
DataInputViewStream#read calls 
org.apache.flink.core.memory.DataInputView#read(byte[], int, int). From its 
comment, we can see : <p>If <code>len</code> is zero, then no bytes are read 
and <code>0</code> is returned;

So when length is 0, inputStream.read will return 0. And 
NoFetchingInput.readBytes won't throw exception.
h2. Why does your test demo throw exception?

Your demo is using java.io.ByteArrayInputStream as the inputStream. From its 
comment, we can see 
{code:java}
* @return  the total number of bytes read into the buffer, or
*          <code>-1</code> if there is no more data because the end of
*          the stream has been reached. {code}
It returns -1 even if length is 0.

When return value is -1,  NoFetchingInput.readBytes will throw exception.

Please correct me if anything is wrong, thanks~

> Kryo input implementation NoFetchingInput fails to handle zero length bytes
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-34954
>                 URL: https://issues.apache.org/jira/browse/FLINK-34954
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Type Serialization System
>    Affects Versions: 1.19.0
>            Reporter: Qinghui Xu
>            Assignee: Qinghui Xu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.20.0
>
>
> If the serailized bytes are empty, `NoFetchingInput` will run into error when 
> Kryo tries to deserialize it.
> Example: a protobuf 3 object that contains only default values will be 
> serialized as 0 length byte array, and the deserialization later will fail. 
> Illustration:
> {noformat}
> import com.esotericsoftware.kryo.Kryo
> import com.esotericsoftware.kryo.io.{ByteBufferInput, ByteBufferOutput, 
> Input, Output}
> import com.google.protobuf.{DescriptorProtos, Message}import 
> com.twitter.chill.protobuf.ProtobufSerializer
> import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
> import java.io.ByteArrayInputStream
>  
> object ProtoSerializationTest {
>   def main(args: Array[String]) = {     
>     val chillProtoSerializer = new ProtobufSerializer
>     val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance
>     val output: Output = new ByteBufferOutput(1000)
>     chillProtoSerializer.write(null, output, protomessage)
>     val serialized: Array[Byte] = output.toBytes
>     println(s"Serialized : $serialized")
>     val input: Input = new NoFetchingInput(new 
> ByteArrayInputStream(serialized))
>     val deserialized = chillProtoSerializer.read(null, input, 
> classOf[BillableClick].asInstanceOf[Class[Message]])
>     println(deserialized)
>   }
> }
> {noformat}
>  
> Error
> {noformat}
> Exception in thread "main" java.lang.RuntimeException: Could not create class 
> com.criteo.glup.BillableClickProto$BillableClick
>     at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
>     at 
> com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22)
>     at ProtoSerialization.main(ProtoSerialization.scala)
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
> more bytes left.
>     at 
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
>     at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
>     at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>     ... 2 more
> Caused by: java.io.EOFException: No more bytes left.
>     ... 5 more{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to