[ 
https://issues.apache.org/jira/browse/BEAM-6650?focusedWorklogId=197440&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-197440
 ]

ASF GitHub Bot logged work on BEAM-6650:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Feb/19 09:13
            Start Date: 12/Feb/19 09:13
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #7810: [BEAM-6650] Add 
bundle test with checkpointing for keyed processing
URL: https://github.com/apache/beam/pull/7810#discussion_r255862018
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 ##########
 @@ -295,13 +301,43 @@ public void initializeState(StateInitializationContext 
context) throws Exception
 
     sideInputReader = NullSideInputReader.of(sideInputs);
 
-    // maybe init by initializeState
-    if (nonKeyedStateInternals == null) {
-      if (keyCoder != null) {
-        nonKeyedStateInternals =
-            new FlinkKeyGroupStateInternals<>(keyCoder, 
getKeyedStateBackend());
-      } else {
-        nonKeyedStateInternals = new 
FlinkSplitStateInternals<>(getOperatorStateBackend());
+    if (keyCoder != null) {
+      nonKeyedStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder, 
getKeyedStateBackend());
+    } else {
+      nonKeyedStateInternals = new 
FlinkSplitStateInternals<>(getOperatorStateBackend());
+    }
+
+    if (getKeyedStateBackend() != null) {
+      int numberOfKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups();
+      KeyGroupsList localKeyGroupRange = 
getKeyedStateBackend().getKeyGroupRange();
+
+      for (KeyGroupStatePartitionStreamProvider streamProvider : 
context.getRawKeyedStateInputs()) {
+        DataInputViewStreamWrapper div = new 
DataInputViewStreamWrapper(streamProvider.getStream());
+
+        int keyGroupIdx = streamProvider.getKeyGroupId();
+        checkArgument(
+            localKeyGroupRange.contains(keyGroupIdx),
+            "Key Group " + keyGroupIdx + " does not belong to the local 
range.");
+
+        // TODO We read the timers here just to progress the stream in order 
to restore
+        // FlinkKeyGroupStateInternals below. The timer service is 
automatically initialized
+        // before calling the initializeState method.
+        InternalTimerServiceSerializationProxy serializationProxy =
+            new InternalTimerServiceSerializationProxy(
+                new HashMap(),
+                getUserCodeClassloader(),
+                numberOfKeyGroups,
+                localKeyGroupRange,
+                this,
+                getProcessingTimeService(),
+                keyGroupIdx);
+        serializationProxy.read(streamProvider.getStream());
 
 Review comment:
   @aljoscha This is the hack I use to make it work for Flink 1.5. I think it 
would be nice for Flink to partially consume the input stream and then pass on 
the remainder to the user code. Is there any way to do that without hacks like 
the above?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 197440)
    Time Spent: 20m  (was: 10m)

> FlinkRunner fails to checkpoint elements emitted during finishBundle
> --------------------------------------------------------------------
>
>                 Key: BEAM-6650
>                 URL: https://issues.apache.org/jira/browse/BEAM-6650
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>             Fix For: 2.11.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Elements emitted during the finalizeBundle call in snapshopState are lost 
> after the pipeline is restored. This only happens when the operator is keyed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to