[ 
https://issues.apache.org/jira/browse/FLINK-34954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qinghui Xu updated FLINK-34954:
-------------------------------
    Description: 
If the serailized bytes are empty, `NoFetchingInput` will run into error when 
Kryo tries to deserialize it.

Example: a protobuf 3 object that contains only default values will be 
serialized as 0 length byte array, and the deserialization later will fail. 
Illustration:
{noformat}
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{ByteBufferInput, ByteBufferOutput, Input, 
Output}
import com.google.protobuf.{DescriptorProtos, Message}import 
com.twitter.chill.protobuf.ProtobufSerializer
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
import java.io.ByteArrayInputStream
 
object ProtoSerializationTest {
  def main(args: Array[String]) = {     
    val chillProtoSerializer = new ProtobufSerializer
    val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance
    val output: Output = new ByteBufferOutput(1000)
    chillProtoSerializer.write(null, output, protomessage)
    val serialized: Array[Byte] = output.toBytes
    println(s"Serialized : $serialized")
    val input: Input = new NoFetchingInput(new ByteArrayInputStream(serialized))
    val deserialized = chillProtoSerializer.read(null, input, 
classOf[BillableClick].asInstanceOf[Class[Message]])
    println(deserialized)
  }
}
{noformat}
 

Error
{noformat}
Exception in thread "main" java.lang.RuntimeException: Could not create class 
com.criteo.glup.BillableClickProto$BillableClick
    at 
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
    at 
com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22)
    at ProtoSerialization.main(ProtoSerialization.scala)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
more bytes left.
    at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
    at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
    at 
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
    ... 2 more
Caused by: java.io.EOFException: No more bytes left.
    ... 5 more{noformat}

  was:
If the serailized bytes are empty, `NoFetchingInput` will run into error when 
Kryo tries to deserialize it.

Example: a protobuf 3 object that contains only default values will be 
serialized as 0 length byte array, and the deserialization later will fail. 
Illustration:
```
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.\{ByteBufferInput, ByteBufferOutput, Input, 
Output}

import com.google.protobuf.\{DescriptorProtos, Message}import 
com.twitter.chill.protobuf.ProtobufSerializer
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput

import java.io.ByteArrayInputStream

object ProtoSerializationTest {

  def main(args: Array[String]) = {
    val chillProtoSerializer = new ProtobufSerializer

    val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance    
val output: Output = new ByteBufferOutput(1000)
    chillProtoSerializer.write(null, output, protomessage)
    val serialized: Array[Byte] = output.toBytes
    println(s"Serialized : $serialized")
    val input: Input = new NoFetchingInput(new ByteArrayInputStream(serialized))
    val deserialized = chillProtoSerializer.read(null, input, 
classOf[BillableClick].asInstanceOf[Class[Message]])
    println(deserialized)
  }
}
```

Error
```

Exception in thread "main" java.lang.RuntimeException: Could not create class 
com.criteo.glup.BillableClickProto$BillableClick
    at 
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
    at 
com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22)
    at ProtoSerialization.main(ProtoSerialization.scala)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
more bytes left.
    at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
    at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
    at 
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
    ... 2 more
Caused by: java.io.EOFException: No more bytes left.
    ... 5 more

```


> Kryo input implementation NoFetchingInput fails to handle zero length bytes
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-34954
>                 URL: https://issues.apache.org/jira/browse/FLINK-34954
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Qinghui Xu
>            Priority: Major
>
> If the serailized bytes are empty, `NoFetchingInput` will run into error when 
> Kryo tries to deserialize it.
> Example: a protobuf 3 object that contains only default values will be 
> serialized as 0 length byte array, and the deserialization later will fail. 
> Illustration:
> {noformat}
> import com.esotericsoftware.kryo.Kryo
> import com.esotericsoftware.kryo.io.{ByteBufferInput, ByteBufferOutput, 
> Input, Output}
> import com.google.protobuf.{DescriptorProtos, Message}import 
> com.twitter.chill.protobuf.ProtobufSerializer
> import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
> import java.io.ByteArrayInputStream
>  
> object ProtoSerializationTest {
>   def main(args: Array[String]) = {     
>     val chillProtoSerializer = new ProtobufSerializer
>     val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance
>     val output: Output = new ByteBufferOutput(1000)
>     chillProtoSerializer.write(null, output, protomessage)
>     val serialized: Array[Byte] = output.toBytes
>     println(s"Serialized : $serialized")
>     val input: Input = new NoFetchingInput(new 
> ByteArrayInputStream(serialized))
>     val deserialized = chillProtoSerializer.read(null, input, 
> classOf[BillableClick].asInstanceOf[Class[Message]])
>     println(deserialized)
>   }
> }
> {noformat}
>  
> Error
> {noformat}
> Exception in thread "main" java.lang.RuntimeException: Could not create class 
> com.criteo.glup.BillableClickProto$BillableClick
>     at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
>     at 
> com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22)
>     at ProtoSerialization.main(ProtoSerialization.scala)
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No 
> more bytes left.
>     at 
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
>     at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
>     at 
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
>     ... 2 more
> Caused by: java.io.EOFException: No more bytes left.
>     ... 5 more{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to