Tzu-Li (Gordon) Tai created FLINK-19748:
-------------------------------------------
Summary: StateFun's UnboundedFeedbackLogger should call
startNewKeyGroup for all assigned key groups
Key: FLINK-19748
URL: https://issues.apache.org/jira/browse/FLINK-19748
Project: Flink
Issue Type: Bug
Components: Stateful Functions
Affects Versions: statefun-2.2.0, statefun-2.1.0, statefun-2.0.0
Reporter: Tzu-Li (Gordon) Tai
Currently, on commit the {{UnboundedFeedbackLogger}} only calls
{{startNewKeyGroup}} on the raw keyed stream for key groups that actually have
logged messages:
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L102
This means that it might skip some key groups, if a key group doesn't have any
logged messages.
This doesn't conform with the expected usage of Flink's
{{KeyedStateCheckpointOutputStream}}, where it expects that for ALL key groups
within the range, {{startNewKeyGroup}} needs to be invoked.
The reason for this is that underneath, calling {{startNewKeyGroup}} would also
record the starting stream offset position for the key group.
However, when iterating through a raw keyed stream, the key group offsets
iterator {{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} doesn't take into
account that some key groups weren't written and therefore do not have offsets
defined, and the streams will be seeked to incorrect positions.
Ultimately, if some key groups were skipped while writing to the raw keyed
stream, the following error will be thrown on restore:
{code}
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.io.IOException: position out of bounds
at
org.apache.flink.runtime.state.StatePartitionStreamProvider.getStream(StatePartitionStreamProvider.java:58)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:235)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:167)
... 9 more
Caused by: java.io.IOException: position out of bounds
at
org.apache.flink.runtime.state.memory.ByteStreamStateHandle$ByteStateHandleInputStream.seek(ByteStreamStateHandle.java:124)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:442)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl$KeyGroupStreamIterator.next(StreamTaskStateInitializerImpl.java:395)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:228)
... 10 more
{code}
**Possible solutions**
There are 2 possible solutions, either by fixing in StateFun or in Flink:
- This can be fixed in StateFun by ensuring that the feedback logger starts a
new key group for all key groups in range, by doing:
{code}
for (int keyGroupId : rawKeyedStream.getKeyGroupList()) {
rawKeyedStream.startNewKeyGroup(keyGroupId);
// write to stream if there are logged messages for this key group
}
{code}
- Or, alternatively, we change the
{{KeyGroupRangeOffsets#KeyGroupOffsetsIterator}} in Flink to skip key groups
that don't have a defined offset (i.e. {{startNewKeyGroup}} wasn't called for
these key groups).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)