The Coder used for State/Timers in a StatefulDoFn is pulled out of the
input PCollection. If a Runner needs to partition by this coder, it
should ensure the coder of this PCollection matches with the Coder
used to create the serialized bytes that are used for partitioning
(whether or not this is length-prefixed).

Concretely, the graph looks like


Runner                          SDK Harness

WriteToGbk
    |
ReadFromGbk
    |
RunnerMapFn.processKeyValue(key, value)
    |
    WriteToDataChannel
            ------------------------>
                 ReadFromDataChannel
                               |
                           (pcIn)
                               |
                  MyStatefulDoFn.process(key, value)

Now the (key part of the) Coder of pcIn, which comes from the proto
that the Runner sent to the SDK, must match the (key part of the)
encoding used in WriteToGbk and ReadFromGbk. If a LenthPrefix is added
in one spot, it must be added in the other.


[1] 
https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183

On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels <m...@apache.org> wrote:
>
> Hi,
>
> I wanted to get your opinion on something that I have been struggling
> with. It is about the coders for state requests in portable pipelines.
>
> In contrast to "classic" Beam, the Runner is not guaranteed to know
> which coder is used by the SDK. If the SDK happens to use a standard
> coder (also known as model coder), we will also have it available at the
> Runner, i.e. if the Runner is written in one of the SDK languages (e.g.
> Java). However, when we do not have a standard coder, we just treat the
> data from the SDK as a blob and just pass it around as bytes.
>
> Problem
> =======
>
> In the case of state requests which the SDK Harness authors to the
> Runner, we would like for the key associated with the state request to
> match the key of the element which led to initiating the state request.
>
> Example:
>
> Runner                 SDK Harness
> ------                 -----------
>
> KV["key","value"]  --> Process Element
>                                |
> LookupState("key") <-- Request state of "key"
>          |
>     State["key"]    --> Receive state
>
>
> For stateful DoFns, the Runner partitions the data based on the key. In
> Flink, this partitioning must not change during the lifetime of a
> pipeline because the checkpointing otherwise breaks[0]. The key is
> extracted from the element and stored encoded.
>
> If we have a standard coder, it is basically the same as in the
> "classic" Runner which takes the key and serializes it. However, when we
> have an SDK-specific coder, we basically do not know how it encodes. So
> far, we have been using the coder instantiated from the Proto, which is
> basically a LengthPrefixCoder[ByteArrayCoder] or similar[1]. We have had
> problems with this because the key encoding of Java SDK state requests
> did not match the key encoding on the Runner side [2]. In an attempt to
> fix those, it is now partly broken for portable Python pipelines.
> Partly, because it "only" affects non-standard coders.
>
> Non-standard coders yield the aforementioned
> LengthPrefixCoder[ByteArrayCoder]. Now, following the usual encoding
> scheme, we would simply encode the key using this coder. However, for
> state requests, the Python SDK leaves out the length prefix for certain
> coders, e.g. for primitives like int or byte. It is possible that one
> coder uses a length prefix, while another doesn't. We have no way of
> telling from the Runner side, if a length prefix has been used or not.
> This results in the keys to not match on the Runner side and the
> partitioning to be broken.
>
>
> How to solve this?
> ==================
>
> (1) Should this simply be fixed on the Python SDK side? One fix would be
> to always append a length prefix to the key in state requests, even for
> primitive coders like VarInt which do not use one.
>
> OR
>
> (2) Should the Runner detect that a non-standard coder is used? If so,
> it should just pass the bytes from the SDK Harness and never make an
> attempt to construct a coder based on the Proto.
>
>
> Thinking about it now, it seems pretty obvious that (2) is the most
> feasible way to avoid complications across all current and future SDKs
> for key encodings. Still, it is odd that the Proto contains coder
> information which is not usable.
>
> What do you think?
>
>
> Thanks,
> Max
>
>
> [0] It is possible to restart the pipeline and repartition the
> checkpointed data.
> [1]
> https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682
> [2] https://issues.apache.org/jira/browse/BEAM-8157

Reply via email to