[
https://issues.apache.org/jira/browse/FLINK-22608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Piotr Nowojski updated FLINK-22608:
-----------------------------------
Component/s: (was: Runtime / State Backends)
(was: Runtime / Checkpointing)
API / Type Serialization System
> Flink Kryo deserialize read wrong bytes
> ---------------------------------------
>
> Key: FLINK-22608
> URL: https://issues.apache.org/jira/browse/FLINK-22608
> Project: Flink
> Issue Type: Bug
> Components: API / Type Serialization System
> Affects Versions: 1.12.0
> Reporter: Si Chen
> Priority: Major
> Labels: stale-major
>
> In flink program, I use ValueState to save my state. The state stores pojo.
> But my pojo used kryo serializer. As the program run some time, I add a field
> in pojo. Then recovery the program with checkpoint. I found the value of the
> field incorrect. Then I read the source code I found
>
> {code:java}
> //代码占位符
> org.apache.flink.runtime.state.heap.HeapRestoreOperation#readStateHandleStateData
> private void readStateHandleStateData(
> FSDataInputStream fsDataInputStream,
> DataInputViewStreamWrapper inView,
> KeyGroupRangeOffsets keyGroupOffsets,
> Map<Integer, StateMetaInfoSnapshot> kvStatesById,
> int numStates,
> int readVersion,
> boolean isCompressed) throws IOException {
> final StreamCompressionDecorator streamCompressionDecorator = isCompressed
> ?
> SnappyStreamCompressionDecorator.INSTANCE :
> UncompressedStreamCompressionDecorator.INSTANCE;
> for (Tuple2<Integer, Long> groupOffset : keyGroupOffsets) {
> int keyGroupIndex = groupOffset.f0;
> long offset = groupOffset.f1;
> // Check that restored key groups all belong to the backend.
> Preconditions.checkState(keyGroupRange.contains(keyGroupIndex), "The
> key group must belong to the backend.");
> fsDataInputStream.seek(offset);
> int writtenKeyGroupIndex = inView.readInt();
> Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
> "Unexpected key-group in restore.");
> try (InputStream kgCompressionInStream =
>
> streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
> readKeyGroupStateData(
> kgCompressionInStream,
> kvStatesById,
> keyGroupIndex,
> numStates,
> readVersion);
> }
> }
> }
> {code}
> my state keyGroupIndex is 81, and keyGroupOffset is 3572. And the next
> keyGroupOffset is 3611. So my state offset rang is 3572 to 3611. But when I
> add new field in pojo. Kryo will read more bytes in the next keyGroup.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)