[
https://issues.apache.org/jira/browse/FLINK-16576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17069428#comment-17069428
]
Congxian Qiu(klion26) commented on FLINK-16576:
-----------------------------------------------
The reason why the restore failed here because of that {{the *mapping of
stateId and metaInfo is wrong*}}.
The mapping is wrong because we registered some metaInfos that do not belong to
current subtask.
{code:java}
// HeapRestoreOperation#restore
createOrCheckStateForMetaInfo(restoredMetaInfos, kvStatesById); // will
register the metainfo
readStateHandleStateData(
fsDataInputStream,
inView,
keyGroupsStateHandle.getGroupRangeOffsets(),
kvStatesById, restoredMetaInfos.size(),
serializationProxy.getReadVersion(),
serializationProxy.isUsingKeyGroupCompression());
private void createOrCheckStateForMetaInfo(
List<StateMetaInfoSnapshot> restoredMetaInfo,
Map<Integer, StateMetaInfoSnapshot> kvStatesById) {
for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) {
final StateSnapshotRestore registeredState;
......
if (registeredState == null) {
kvStatesById.put(kvStatesById.size(), metaInfoSnapshot); //
constructing the mapping between stateId and metaInfo, even if the current
statehandle does not belong to the current subtask
}
}
}
{code}
from the code above we can see, we'll always register the metainfo even if the
current state handle does not belong to ourselves(the KeyGroupStateHandle will
contain metaInfo, EMPTY_KEYGROUP, empty offsets and the stateHandle data).
after the registered the wrong metainfo, then the *{{mapping of stateId and
metaInfo becomes wrong(when constructing the mapping, we assume that all the
handles belong to the current subtask).}}* {{(RocksDBStateBackend does not
construct such mapping, so would not encounter such error).}}
{{For the solution here, I want to filter out the stateHandles out when
assigning state to subtask in }}{{StateAssignmentOperation}}.{{ }}
> State inconsistency on restore with memory state backends
> ---------------------------------------------------------
>
> Key: FLINK-16576
> URL: https://issues.apache.org/jira/browse/FLINK-16576
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.9.2, 1.10.0
> Reporter: Nico Kruber
> Assignee: Congxian Qiu(klion26)
> Priority: Blocker
> Fix For: 1.9.3, 1.10.1, 1.11.0
>
>
> I occasionally see a few state inconsistencies with the {{TopSpeedWindowing}}
> example in Flink. Restore would fail with either of these causes, but only
> for the memory state backends and only with some combinations of parallelism
> I took the savepoint with and parallelism I restore the job with:
> {code:java}
> java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=64,
> endKeyGroup=95} does not contain key group 97 {code}
> or
> {code:java}
> java.lang.NullPointerException
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
> {code}
> or
> {code:java}
> java.io.IOException: Corrupt stream, found tag: 8
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
> {code}
>
> I managed to make it reproducible in a test that I quickly hacked together in
> [https://github.com/NicoK/flink/blob/state.corruption.debug/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingSavepointRestoreITCase.java]
> (please checkout the whole repository since I had to change some
> dependencies).
> In a bit more detail, this is what I discovered before, also with a manual
> savepoint on S3:
> Savepoint that was taken with parallelism 2 (p=2) and shows the restore
> failure in three different ways (all running in Flink 1.10.0; but I also see
> it in Flink 1.9):
> * first of all, if I try to restore with p=2, everything is fine
> * if I restore with p=4 I get an exception like the one mentioned above:
> {code:java}
> 2020-03-11 15:53:35,149 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator,
> PassThroughWindowFunction) -> Sink: Print to Std. Out (3/4)
> (2ecdb03905cc8a376d43b086925452a6) switched from RUNNING to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from any of the
> 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: java.lang.IllegalArgumentException:
> KeyGroupRange{startKeyGroup=64, endKeyGroup=95} does not contain key group 97
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
> at
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
> ... 15 more
> {code}
> * if I restore with p=3, I get the following exception instead:
> {code:java}
> 2020-03-11 21:40:28,390 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator,
> PassThroughWindowFunction) -> Sink: Print to Std. Out (3/3)
> (2fb8acde321f6cbfec56c300153d6dea) switched from RUNNING to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/3) from any of the
> 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
> ... 15 more
> {code}
> While the latter error somewhat indicates that the savepoint may be corrupt
> (or that the reader/deserializer messed up), it remains a mystery to me why I
> can successfully restore with p=2.
> * occasionally, for p=4, I also see this exception instead:
> {code:java}
> 10:23:12,046 WARN
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure -
> Exception while restoring keyed state backend for
> EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from
> alternative (1/1), will retry while more alternatives are available.
> org.apache.flink.runtime.state.BackendBuildingException: Failed when trying
> to restore heap backend
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Corrupt stream, found tag: 8
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at
> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:133)
> at
> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
> at
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> at
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:295)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:256)
> at
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:155)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
> ... 15 more{code}
>
> I narrowed the cause down to the state access inside {{DeltaTrigger}}:
> Removing the cleanup in
> {{org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger#clear()}}
> did not change anything but removing state access in its {{onElement}}
> (pictured below) seems to resolve the bug:
> {code:java}
> @Override
> public TriggerResult onElement(T element, long timestamp, W window,
> TriggerContext ctx) throws Exception {
> ValueState<T> lastElementState = ctx.getPartitionedState(stateDesc);
> if (lastElementState.value() == null) {
> lastElementState.update(element);
> return TriggerResult.CONTINUE;
> }
> if (deltaFunction.getDelta(lastElementState.value(), element) >
> this.threshold) {
> lastElementState.update(element);
> return TriggerResult.FIRE;
> }
> return TriggerResult.CONTINUE;
> } {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)