[ 
https://issues.apache.org/jira/browse/FLINK-19748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-19748:
----------------------------------------
    Fix Version/s: statefun-2.2.1
                   1.11.3
                   1.12.0

> StateFun's UnboundedFeedbackLogger should call startNewKeyGroup for all 
> assigned key groups
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-19748
>                 URL: https://issues.apache.org/jira/browse/FLINK-19748
>             Project: Flink
>          Issue Type: Bug
>          Components: Stateful Functions
>    Affects Versions: statefun-2.0.0, statefun-2.1.0, statefun-2.2.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.11.3, statefun-2.2.1
>
>
> Currently, on commit the {{UnboundedFeedbackLogger}} only calls 
> {{startNewKeyGroup}} on the raw keyed stream for key groups that actually 
> have logged messages:
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L102
> This means that it might skip some key groups, if a key group doesn't have 
> any logged messages.
> This doesn't conform with the expected usage of Flink's 
> {{KeyedStateCheckpointOutputStream}}, where it expects that for ALL key 
> groups within the range, {{startNewKeyGroup}} needs to be invoked.
> The reason for this is that underneath, calling {{startNewKeyGroup}} would 
> also record the starting stream offset position for the key group.
> However, when iterating through a raw keyed stream, the key group offsets 
> iterator {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} doesn't take into 
> account that some key groups weren't written and therefore do not have 
> offsets defined, and the streams will be seeked to incorrect positions.
> Ultimately, if some key groups were skipped while writing to the raw keyed 
> stream, the following error will be thrown on restore:
> {code}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.io.IOException: position out of bounds
>       at 
> org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
>       ... 9 more
> Caused by: java.io.IOException: position out of bounds
>       at 
> org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
>       ... 10 more
> {code}
> h2. *Possible solutions*
> There are 2 possible solutions, either by fixing in StateFun or in Flink:
> - This can be fixed in StateFun by ensuring that the feedback logger starts a 
> new key group for all key groups in range, by doing:
> {code}
> for (int keyGroupId : rawKeyedStream.getKeyGroupList()) {
>     rawKeyedStream.startNewKeyGroup(keyGroupId);
>     // write to stream if there are logged messages for this key group
> }
> {code}
> - Or, alternatively, we change the 
> {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} in Flink to skip key groups 
> that don't have a defined offset (i.e. {{startNewKeyGroup}} wasn't called for 
> these key groups).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to