[ https://issues.apache.org/jira/browse/FLINK-29347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yun Tang updated FLINK-29347: ----------------------------- Fix Version/s: 1.17.0 > Failed to restore from list state with empty protobuf object > ------------------------------------------------------------ > > Key: FLINK-29347 > URL: https://issues.apache.org/jira/browse/FLINK-29347 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / State Backends > Affects Versions: 1.14.2, 1.15.0 > Reporter: shen > Assignee: shen > Priority: Major > Labels: bugfix, checkpoint, pull-request-available, states > Fix For: 1.17.0 > > > I use protobuf generated class in an union list state. > When my flink job restores from checkpoint, I get exception: > {code:java} > Caused by: java.lang.RuntimeException: Could not create class > com.MY_PROTOBUF_GENERATED_CLASS > at > com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76) > ~[my-lib-0.1.1-SNAPSHOT.jar:?] > at > com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40) > ~[my-lib-0.1.1-SNAPSHOT.jar:?] > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] > Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No > more bytes left. > at > org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73) > ~[my-lib-0.1.1-SNAPSHOT.jar:?] > at > com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40) > ~[my-lib-0.1.1-SNAPSHOT.jar:?] > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] > Caused by: java.io.EOFException: No more bytes left. > at > org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73) > ~[my-lib-0.1.1-SNAPSHOT.jar:?] > at > com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40) > ~[my-lib-0.1.1-SNAPSHOT.jar:?] > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > ~[flink-dist_2.12-1.14.4.jar:1.14.4] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] > {code} > > I find it is because when protobuf serializer serializes an object, which is > built directly with builder without assign any value to field, the serializer > will generate a zero length byte[] and then write it into state with content > '\0'(indicates zero length data). > When recovered from checkpoint, protobuf seralizer deserialize the data. It > get length 0, and call InputStream#read(byte[] bytes, int offset, int count) > with count = 0. > The underlying Input implementation is > [NoFetchingInput|https://github.com/apache/flink/blob/9d2ae5572897f3e2d9089414261a250cfc2a2ab8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java]. > It will call Inputsteam#read(byte[] bytes, int offset, int count) with count > = 0. > The InputStream implementation is > [ByteStateHandleInputStream|https://github.com/apache/flink/blob/53d5e1cf9666517bc2fded60b510f2fd13d93f10/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java#L140-L153], > It will {*}return -1 as long as no data left in memory,even if count is 0{*}. > A simple fix is add check before return -1. If caller reads 0 bytes, it > should always return 0 instead of -1. -- This message was sent by Atlassian Jira (v8.20.10#820010)