[ 
https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=341494&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-341494
 ]

ASF GitHub Bot logged work on BEAM-8157:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Nov/19 21:40
            Start Date: 11/Nov/19 21:40
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on pull request #9997: [BEAM-8157] 
Fix key encoding issues for state requests with unknown coders / Improve 
debugging and testing
URL: https://github.com/apache/beam/pull/9997#discussion_r344920094
 
 

 ##########
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuser.java
 ##########
 @@ -141,6 +149,55 @@ public static ExecutableStage forGrpcPortRead(
         materializedPCollections);
   }
 
+  /**
+   * Patches all input coders of stateful ExecutableStage transforms. Stateful 
transforms aways have
+   * a KvCoder as input. Flink partitions the data based on the serialized 
version of the key. This
+   * key must retain the same binary representation to be able to serve the 
state in state requests
+   * from the SDK Harness from the correct partition. If the binary 
representation does not match,
+   * this will result in inconsistent checkpoints because keys are associated 
to a Flink key group
+   * which does not belong to the key group range of the state handling 
operator.
+   */
+  private static RunnerApi.Components lengthPrefixKeyCoder(
 
 Review comment:
   Now that I looked through the shared code more, we don't have access to the 
wire coder at this point in time and really we are instantiating it in 
https://github.com/apache/beam/blob/6b3f7d62d3a66b750a7bd541a94646fdf735bbd7/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L174
   
   There we would have access to the exact coder id that we should be setting 
on the main input pcollection.
   
   That class seems like the appropriate place to do this modification and it 
is also the place where we instantiate the runner coder related to the user 
state specs.
   
   Sorry that I didn't realize this earlier when I made my suggestion to place 
it in the GreedyStageFuser.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 341494)
    Time Spent: 11.5h  (was: 11h 20m)

> Key encoding for state requests is not consistent across SDKs
> -------------------------------------------------------------
>
>                 Key: BEAM-8157
>                 URL: https://issues.apache.org/jira/browse/BEAM-8157
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.13.0
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Critical
>             Fix For: 2.17.0
>
>          Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> The Flink runner requires the internal key to be encoded without a length 
> prefix (OUTER context). The user state request handler exposes a serialized 
> version of the key to the Runner. This key is encoded with the NESTED context 
> which may add a length prefix. We need to convert it to OUTER context to 
> match the Flink runner's key encoding.
> So far this has not caused the Flink Runner to behave incorrectly. However, 
> with the upcoming support for Flink 1.9, the state backend will not accept 
> requests for keys not part of any key group/partition of the operator. This 
> is very likely to happen with the encoding not being consistent.
> **NOTE** This is only applicable to the Java SDK, as the Python SDK uses 
> OUTER encoding for the key in state requests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to