[
https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=307854&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-307854
]
ASF GitHub Bot logged work on BEAM-8157:
----------------------------------------
Author: ASF GitHub Bot
Created on: 06/Sep/19 13:34
Start Date: 06/Sep/19 13:34
Worklog Time Spent: 10m
Work Description: mxm commented on pull request #9484: [BEAM-8157] Remove
length prefix from state key for Flink's state backend
URL: https://github.com/apache/beam/pull/9484#discussion_r321737041
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -343,11 +343,11 @@ public void clear(K key, W window) {
}
private void prepareStateBackend(K key) {
- // Key for state request is shipped already encoded as ByteString,
- // this is mostly a wrapping with ByteBuffer. We still follow the
- // usual key encoding procedure.
- // final ByteBuffer encodedKey = FlinkKeyUtils.encodeKey(key,
keyCoder);
- final ByteBuffer encodedKey = ByteBuffer.wrap(key.toByteArray());
+ // Key for state request is shipped already encoded as ByteString,
but it is
Review comment:
Here is why this works for Flink <= 1.8 but definitely needs to be fixed
moving forward:
Keys for state requests are encoded incorrectly for the Flink Runner's state
backend because they may contain a length prefix, which is not in lines with
how the key is encoded for data partitioning. This causes state to be written
to the wrong key group. However, the encoding scheme is consistent, so reading
from state in `ProcessElement` or `OnTimer` methods works correctly. Also when
inter-playing with timers. However, there are potential issues with
checkpoints/savepoints due to this.
In Flink 1.9, the key encoding mismatch is visible because reading state
from a key not part of the partition _can_ return a `null` value. That is, not
always, likely due to the state compaction logic. This explains why
`PortableStateExecutionTest` works correctly but `PortableTimersExecutionTest`
does not. That said, I think we should merge this fix ASAP for the upcoming
release.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 307854)
Time Spent: 2h 10m (was: 2h)
> Flink state requests return wrong state in timers when encoded key is
> length-prefixed
> -------------------------------------------------------------------------------------
>
> Key: BEAM-8157
> URL: https://issues.apache.org/jira/browse/BEAM-8157
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.13.0
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Major
> Fix For: 2.16.0
>
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> Due to multiple changes made in BEAM-7126, the Flink internal key encoding is
> broken when the key is encoded with a length prefix. The Flink runner
> requires the internal key to be encoded without a length prefix.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)