[
https://issues.apache.org/jira/browse/FLINK-34954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Cranmer reassigned FLINK-34954:
-------------------------------------
Assignee: Qinghui Xu
> Kryo input implementation NoFetchingInput fails to handle zero length bytes
> ---------------------------------------------------------------------------
>
> Key: FLINK-34954
> URL: https://issues.apache.org/jira/browse/FLINK-34954
> Project: Flink
> Issue Type: Bug
> Components: API / Type Serialization System
> Affects Versions: 1.19.0
> Reporter: Qinghui Xu
> Assignee: Qinghui Xu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.20.0
>
>
> If the serailized bytes are empty, `NoFetchingInput` will run into error when
> Kryo tries to deserialize it.
> Example: a protobuf 3 object that contains only default values will be
> serialized as 0 length byte array, and the deserialization later will fail.
> Illustration:
> {noformat}
> import com.esotericsoftware.kryo.Kryo
> import com.esotericsoftware.kryo.io.{ByteBufferInput, ByteBufferOutput,
> Input, Output}
> import com.google.protobuf.{DescriptorProtos, Message}import
> com.twitter.chill.protobuf.ProtobufSerializer
> import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
> import java.io.ByteArrayInputStream
>
> object ProtoSerializationTest {
> def main(args: Array[String]) = {
> val chillProtoSerializer = new ProtobufSerializer
> val protomessage = DescriptorProtos.DescriptorProto.getDefaultInstance
> val output: Output = new ByteBufferOutput(1000)
> chillProtoSerializer.write(null, output, protomessage)
> val serialized: Array[Byte] = output.toBytes
> println(s"Serialized : $serialized")
> val input: Input = new NoFetchingInput(new
> ByteArrayInputStream(serialized))
> val deserialized = chillProtoSerializer.read(null, input,
> classOf[BillableClick].asInstanceOf[Class[Message]])
> println(deserialized)
> }
> }
> {noformat}
>
> Error
> {noformat}
> Exception in thread "main" java.lang.RuntimeException: Could not create class
> com.criteo.glup.BillableClickProto$BillableClick
> at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
> at
> com.criteo.streaming.common.bootstrap.ProtoSerialization$.main(ProtoSerialization.scala:22)
> at ProtoSerialization.main(ProtoSerialization.scala)
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No
> more bytes left.
> at
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
> at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:332)
> at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
> ... 2 more
> Caused by: java.io.EOFException: No more bytes left.
> ... 5 more{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)