[
https://issues.apache.org/jira/browse/FLINK-29347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
shen updated FLINK-29347:
-------------------------
Description:
I use protobuf generated class in an union list state.
When my flink job restores from checkpoint, I get exception:
{code:java}
Caused by: java.lang.RuntimeException: Could not create class
com.MY_PROTOBUF_GENERATED_CLASS
at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
~[my-lib-0.1.1-SNAPSHOT.jar:?]
at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
~[my-lib-0.1.1-SNAPSHOT.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No
more bytes left.
at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
~[my-lib-0.1.1-SNAPSHOT.jar:?]
at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
~[my-lib-0.1.1-SNAPSHOT.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
Caused by: java.io.EOFException: No more bytes left.
at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
~[my-lib-0.1.1-SNAPSHOT.jar:?]
at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
~[my-lib-0.1.1-SNAPSHOT.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
{code}
I find it is because when protobuf serializer serializes an object, which is
built directly with builder without assign any value to field, the serializer
will generate a zero length byte[] and then write it into state with content
'\0'(indicates zero length data).
When recovered from checkpoint,protobuf seralizer deserialize the data. It get
length 0,and call InputStream#read(byte[] bytes, int offset, int count) with
count = 0.
The underlying Input implementation is
[NoFetchingInput|https://github.com/apache/flink/blob/9d2ae5572897f3e2d9089414261a250cfc2a2ab8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java].
It will call Inputsteam#read(byte[] bytes, int offset, int count) with count =
0.
The InputStream implementation is
[ByteStateHandleInputStream|https://github.com/apache/flink/-/blob/fcbbd44695524770bb1c000b8b8cfb5884536a62/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java#L133-146],It
will {*}return -1 as long as no data left in memory,even if count is 0{*}.
A simple fix is add check before return -1. If caller reads 0 bytes, it should
always return 0 instead of -1
was:
I use protobuf generated class in an union list state.
When my flink job restores from checkpoint, I get exception:
{code:java}
Caused by: java.lang.RuntimeException: Could not create class
com.MY_PROTOBUF_GENERATED_CLASS
at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
~[my-lib-0.1.1-SNAPSHOT.jar:?]
at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
~[my-lib-0.1.1-SNAPSHOT.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No
more bytes left.
at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
~[my-lib-0.1.1-SNAPSHOT.jar:?]
at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
~[my-lib-0.1.1-SNAPSHOT.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
Caused by: java.io.EOFException: No more bytes left.
at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
~[my-lib-0.1.1-SNAPSHOT.jar:?]
at
com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
~[my-lib-0.1.1-SNAPSHOT.jar:?]
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.12-1.14.4.jar:1.14.4]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
{code}
I find it is because when protobuf serializer serializes an object, which is
built directly with builder without assign any value to field, the serializer
will generate a zero length byte[] and then write it into state with content
'\0'(indicates zero length data).
When recovered from checkpoint,protobuf seralizer deserialize the data. It get
length 0,and call InputStream#read(byte[] bytes, int offset, int count) with
count = 0.
The underlying Input implementation is
[NoFetchingInput|https://github.com/apache/flink/blob/9d2ae5572897f3e2d9089414261a250cfc2a2ab8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java].
It will call Inputsteam#read(byte[] bytes, int offset, int count) with count =
0.
The InputStream implementation is
[ByteStateHandleInputStream|https://gitlab.sensorsdata.cn/sensors-analytics/flink/-/blob/fcbbd44695524770bb1c000b8b8cfb5884536a62/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java#L133-146],It
will {*}return -1 as long as no data left in memory,even if count is 0{*}.
A simple fix is add check before return -1. If caller reads 0 bytes, it should
always return 0 instead of -1
> Failed to restore from list state with empty protobuf object
> ------------------------------------------------------------
>
> Key: FLINK-29347
> URL: https://issues.apache.org/jira/browse/FLINK-29347
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.14.2, 1.15.0
> Reporter: shen
> Priority: Major
> Labels: bugfix, checkpoint, states
>
> I use protobuf generated class in an union list state.
> When my flink job restores from checkpoint, I get exception:
> {code:java}
> Caused by: java.lang.RuntimeException: Could not create class
> com.MY_PROTOBUF_GENERATED_CLASS
> at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:76)
> ~[my-lib-0.1.1-SNAPSHOT.jar:?]
> at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
> ~[my-lib-0.1.1-SNAPSHOT.jar:?]
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.EOFException: No
> more bytes left.
> at
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
> ~[my-lib-0.1.1-SNAPSHOT.jar:?]
> at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
> ~[my-lib-0.1.1-SNAPSHOT.jar:?]
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> Caused by: java.io.EOFException: No more bytes left.
> at
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:128)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:314)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:73)
> ~[my-lib-0.1.1-SNAPSHOT.jar:?]
> at
> com.twitter.chill.protobuf.ProtobufSerializer.read(ProtobufSerializer.java:40)
> ~[my-lib-0.1.1-SNAPSHOT.jar:?]
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> ~[flink-dist_2.12-1.14.4.jar:1.14.4]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> {code}
>
> I find it is because when protobuf serializer serializes an object, which is
> built directly with builder without assign any value to field, the serializer
> will generate a zero length byte[] and then write it into state with content
> '\0'(indicates zero length data).
> When recovered from checkpoint,protobuf seralizer deserialize the data. It
> get length 0,and call InputStream#read(byte[] bytes, int offset, int count)
> with count = 0.
> The underlying Input implementation is
> [NoFetchingInput|https://github.com/apache/flink/blob/9d2ae5572897f3e2d9089414261a250cfc2a2ab8/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java].
> It will call Inputsteam#read(byte[] bytes, int offset, int count) with count
> = 0.
> The InputStream implementation is
> [ByteStateHandleInputStream|https://github.com/apache/flink/-/blob/fcbbd44695524770bb1c000b8b8cfb5884536a62/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java#L133-146],It
> will {*}return -1 as long as no data left in memory,even if count is 0{*}.
> A simple fix is add check before return -1. If caller reads 0 bytes, it
> should always return 0 instead of -1
--
This message was sent by Atlassian Jira
(v8.20.10#820010)