[ 
https://issues.apache.org/jira/browse/FLINK-16576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17069428#comment-17069428
 ] 

Congxian Qiu(klion26) commented on FLINK-16576:
-----------------------------------------------

The reason why the restore failed here because of that {{the *mapping of 
stateId and metaInfo is wrong*}}.

The mapping is wrong because we registered some metaInfos that do not belong to 
current subtask. 
{code:java}
// HeapRestoreOperation#restore
createOrCheckStateForMetaInfo(restoredMetaInfos, kvStatesById); // will 
register the metainfo

readStateHandleStateData(
   fsDataInputStream,
   inView,
   keyGroupsStateHandle.getGroupRangeOffsets(),
   kvStatesById, restoredMetaInfos.size(),
   serializationProxy.getReadVersion(),
   serializationProxy.isUsingKeyGroupCompression());


private void createOrCheckStateForMetaInfo(
   List<StateMetaInfoSnapshot> restoredMetaInfo,
   Map<Integer, StateMetaInfoSnapshot> kvStatesById) {

   for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) {
      final StateSnapshotRestore registeredState;

      ......

      if (registeredState == null) {
         kvStatesById.put(kvStatesById.size(), metaInfoSnapshot); // 
constructing the mapping between stateId and metaInfo, even if the current 
statehandle does not belong to the current subtask
      }
   }
}
{code}
from the code above we can see, we'll always register the metainfo even if the 
current state handle does not belong to ourselves(the KeyGroupStateHandle will 
contain metaInfo, EMPTY_KEYGROUP, empty offsets and the stateHandle data). 
after the registered the wrong metainfo, then the *{{mapping of stateId and 
metaInfo becomes wrong(when constructing the mapping, we assume that all the 
handles belong to the current subtask).}}* {{(RocksDBStateBackend does not 
construct such mapping, so would not encounter such error).}}

{{For the solution here, I want to filter out the stateHandles out when 
assigning state to subtask in }}{{StateAssignmentOperation}}.{{ }}

> State inconsistency on restore with memory state backends
> ---------------------------------------------------------
>
>                 Key: FLINK-16576
>                 URL: https://issues.apache.org/jira/browse/FLINK-16576
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.9.2, 1.10.0
>            Reporter: Nico Kruber
>            Assignee: Congxian Qiu(klion26)
>            Priority: Blocker
>             Fix For: 1.9.3, 1.10.1, 1.11.0
>
>
> I occasionally see a few state inconsistencies with the {{TopSpeedWindowing}} 
> example in Flink. Restore would fail with either of these causes, but only 
> for the memory state backends and only with some combinations of parallelism 
> I took the savepoint with and parallelism I restore the job with:
> {code:java}
> java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=64, 
> endKeyGroup=95} does not contain key group 97 {code}
> or
> {code:java}
> java.lang.NullPointerException
>       at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
>  {code}
> or
> {code:java}
> java.io.IOException: Corrupt stream, found tag: 8
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
>  {code}
>  
> I managed to make it reproducible in a test that I quickly hacked together in 
> [https://github.com/NicoK/flink/blob/state.corruption.debug/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingSavepointRestoreITCase.java]
>  (please checkout the whole repository since I had to change some 
> dependencies).
> In a bit more detail, this is what I discovered before, also with a manual 
> savepoint on S3:
> Savepoint that was taken with parallelism 2 (p=2) and shows the restore 
> failure in three different ways (all running in Flink 1.10.0; but I also see 
> it in Flink 1.9):
>  * first of all, if I try to restore with p=2, everything is fine
>  * if I restore with p=4 I get an exception like the one mentioned above:
> {code:java}
> 2020-03-11 15:53:35,149 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
> Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, 
> PassThroughWindowFunction) -> Sink: Print to Std. Out (3/4) 
> (2ecdb03905cc8a376d43b086925452a6) switched from RUNNING to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from any of the 
> 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>       ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore heap backend
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
>       at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>       ... 11 more
> Caused by: java.lang.IllegalArgumentException: 
> KeyGroupRange{startKeyGroup=64, endKeyGroup=95} does not contain key group 97
>       at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161)
>       at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
>       at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
>       at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
>       at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>       at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper.lambda$keyGroupReader$0(HeapPriorityQueueSnapshotRestoreWrapper.java:85)
>       at 
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:298)
>       at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:293)
>       at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
>       at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
>       ... 15 more
> {code}
>  * if I restore with p=3, I get the following exception instead:
> {code:java}
> 2020-03-11 21:40:28,390 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
> Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, 
> PassThroughWindowFunction) -> Sink: Print to Std. Out (3/3) 
> (2fb8acde321f6cbfec56c300153d6dea) switched from RUNNING to FAILED.
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for 
> EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/3) from any of the 
> 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>       ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore heap backend
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
>       at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>       ... 11 more
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:280)
>       at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:254)
>       at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
>       ... 15 more
> {code}
> While the latter error somewhat indicates that the savepoint may be corrupt 
> (or that the reader/deserializer messed up), it remains a mystery to me why I 
> can successfully restore with p=2.
>  * occasionally, for p=4, I also see this exception instead:
> {code:java}
> 10:23:12,046 WARN  
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - 
> Exception while restoring keyed state backend for 
> EvictingWindowOperator_90bea66de1c231edf33913ecd54406c1_(3/4) from 
> alternative (1/1), will retry while more alternatives are available.
> org.apache.flink.runtime.state.BackendBuildingException: Failed when trying 
> to restore heap backend
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
>       at 
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:529)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Corrupt stream, found tag: 8
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>       at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:133)
>       at 
> org.apache.flink.api.common.typeutils.base.ListSerializer.deserialize(ListSerializer.java:42)
>       at 
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
>       at 
> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
>       at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:295)
>       at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:256)
>       at 
> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:155)
>       at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
>       ... 15 more{code}
>  
> I narrowed the cause down to the state access inside {{DeltaTrigger}}: 
> Removing the cleanup in 
> {{org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger#clear()}} 
> did not change anything but removing state access in its {{onElement}} 
> (pictured below) seems to resolve the bug:
> {code:java}
> @Override
> public TriggerResult onElement(T element, long timestamp, W window, 
> TriggerContext ctx) throws Exception {
>    ValueState<T> lastElementState = ctx.getPartitionedState(stateDesc);
>    if (lastElementState.value() == null) {
>       lastElementState.update(element);
>       return TriggerResult.CONTINUE;
>    }
>    if (deltaFunction.getDelta(lastElementState.value(), element) > 
> this.threshold) {
>       lastElementState.update(element);
>       return TriggerResult.FIRE;
>    }
>    return TriggerResult.CONTINUE;
> } {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to