[
https://issues.apache.org/jira/browse/BEAM-6650?focusedWorklogId=197440&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-197440
]
ASF GitHub Bot logged work on BEAM-6650:
----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Feb/19 09:13
Start Date: 12/Feb/19 09:13
Worklog Time Spent: 10m
Work Description: mxm commented on pull request #7810: [BEAM-6650] Add
bundle test with checkpointing for keyed processing
URL: https://github.com/apache/beam/pull/7810#discussion_r255862018
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -295,13 +301,43 @@ public void initializeState(StateInitializationContext
context) throws Exception
sideInputReader = NullSideInputReader.of(sideInputs);
- // maybe init by initializeState
- if (nonKeyedStateInternals == null) {
- if (keyCoder != null) {
- nonKeyedStateInternals =
- new FlinkKeyGroupStateInternals<>(keyCoder,
getKeyedStateBackend());
- } else {
- nonKeyedStateInternals = new
FlinkSplitStateInternals<>(getOperatorStateBackend());
+ if (keyCoder != null) {
+ nonKeyedStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
getKeyedStateBackend());
+ } else {
+ nonKeyedStateInternals = new
FlinkSplitStateInternals<>(getOperatorStateBackend());
+ }
+
+ if (getKeyedStateBackend() != null) {
+ int numberOfKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups();
+ KeyGroupsList localKeyGroupRange =
getKeyedStateBackend().getKeyGroupRange();
+
+ for (KeyGroupStatePartitionStreamProvider streamProvider :
context.getRawKeyedStateInputs()) {
+ DataInputViewStreamWrapper div = new
DataInputViewStreamWrapper(streamProvider.getStream());
+
+ int keyGroupIdx = streamProvider.getKeyGroupId();
+ checkArgument(
+ localKeyGroupRange.contains(keyGroupIdx),
+ "Key Group " + keyGroupIdx + " does not belong to the local
range.");
+
+ // TODO We read the timers here just to progress the stream in order
to restore
+ // FlinkKeyGroupStateInternals below. The timer service is
automatically initialized
+ // before calling the initializeState method.
+ InternalTimerServiceSerializationProxy serializationProxy =
+ new InternalTimerServiceSerializationProxy(
+ new HashMap(),
+ getUserCodeClassloader(),
+ numberOfKeyGroups,
+ localKeyGroupRange,
+ this,
+ getProcessingTimeService(),
+ keyGroupIdx);
+ serializationProxy.read(streamProvider.getStream());
Review comment:
@aljoscha This is the hack I use to make it work for Flink 1.5. I think it
would be nice for Flink to partially consume the input stream and then pass on
the remainder to the user code. Is there any way to do that without hacks like
the above?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 197440)
Time Spent: 20m (was: 10m)
> FlinkRunner fails to checkpoint elements emitted during finishBundle
> --------------------------------------------------------------------
>
> Key: BEAM-6650
> URL: https://issues.apache.org/jira/browse/BEAM-6650
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Major
> Fix For: 2.11.0
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Elements emitted during the finalizeBundle call in snapshopState are lost
> after the pipeline is restored. This only happens when the operator is keyed.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)