[
https://issues.apache.org/jira/browse/FLINK-19748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tzu-Li (Gordon) Tai closed FLINK-19748.
---------------------------------------
Resolution: Fixed
flink/master: 3367c238ad9de0dc50735df33ff09852a3427b1b
flink/release-1.11: 12a2ade31b4ba3beb9eb3fe1d26064223fc02fec
> KeyGroupRangeOffsets#KeyGroupOffsetsIterator should skip key groups that
> don't have a defined offset
> ----------------------------------------------------------------------------------------------------
>
> Key: FLINK-19748
> URL: https://issues.apache.org/jira/browse/FLINK-19748
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / State Backends
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> Currently, on commit the {{UnboundedFeedbackLogger}} only calls
> {{startNewKeyGroup}} on the raw keyed stream for key groups that actually
> have logged messages:
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L102
> This means that it might skip some key groups, if a key group doesn't have
> any logged messages.
> This doesn't conform with the expected usage of Flink's
> {{KeyedStateCheckpointOutputStream}}, where it expects that for ALL key
> groups within the range, {{startNewKeyGroup}} needs to be invoked.
> The reason for this is that underneath, calling {{startNewKeyGroup}} would
> also record the starting stream offset position for the key group.
> However, when iterating through a raw keyed stream, the key group offsets
> iterator {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} doesn't take into
> account that some key groups weren't written and therefore do not have
> offsets defined, and the streams will be seeked to incorrect positions.
> Ultimately, if some key groups were skipped while writing to the raw keyed
> stream, the following error will be thrown on restore:
> {code}
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: java.io.IOException: position out of bounds
> at
> org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
> ... 9 more
> Caused by: java.io.IOException: position out of bounds
> at
> org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
> ... 10 more
> {code}
> h2. *Solution*
> We change the {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} in Flink to
> skip key groups that don't have a defined offset (i.e. {{startNewKeyGroup}}
> wasn't called for these key groups).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)