+1 to what Robert said.

On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw <rober...@google.com> wrote:

> The Coder used for State/Timers in a StatefulDoFn is pulled out of the
> input PCollection. If a Runner needs to partition by this coder, it
> should ensure the coder of this PCollection matches with the Coder
> used to create the serialized bytes that are used for partitioning
> (whether or not this is length-prefixed).
>
> Concretely, the graph looks like
>
>
> Runner                          SDK Harness
>
> WriteToGbk
>     |
> ReadFromGbk
>     |
> RunnerMapFn.processKeyValue(key, value)
>     |
>     WriteToDataChannel
>             ------------------------>
>                  ReadFromDataChannel
>                                |
>                            (pcIn)
>                                |
>                   MyStatefulDoFn.process(key, value)
>
> Now the (key part of the) Coder of pcIn, which comes from the proto
> that the Runner sent to the SDK, must match the (key part of the)
> encoding used in WriteToGbk and ReadFromGbk. If a LenthPrefix is added
> in one spot, it must be added in the other.
>
>
> [1]
> https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183
>
> On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels <m...@apache.org> wrote:
> >
> > Hi,
> >
> > I wanted to get your opinion on something that I have been struggling
> > with. It is about the coders for state requests in portable pipelines.
> >
> > In contrast to "classic" Beam, the Runner is not guaranteed to know
> > which coder is used by the SDK. If the SDK happens to use a standard
> > coder (also known as model coder), we will also have it available at the
> > Runner, i.e. if the Runner is written in one of the SDK languages (e.g.
> > Java). However, when we do not have a standard coder, we just treat the
> > data from the SDK as a blob and just pass it around as bytes.
> >
> > Problem
> > =======
> >
> > In the case of state requests which the SDK Harness authors to the
> > Runner, we would like for the key associated with the state request to
> > match the key of the element which led to initiating the state request.
> >
> > Example:
> >
> > Runner                 SDK Harness
> > ------                 -----------
> >
> > KV["key","value"]  --> Process Element
> >                                |
> > LookupState("key") <-- Request state of "key"
> >          |
> >     State["key"]    --> Receive state
> >
> >
> > For stateful DoFns, the Runner partitions the data based on the key. In
> > Flink, this partitioning must not change during the lifetime of a
> > pipeline because the checkpointing otherwise breaks[0]. The key is
> > extracted from the element and stored encoded.
> >
> > If we have a standard coder, it is basically the same as in the
> > "classic" Runner which takes the key and serializes it. However, when we
> > have an SDK-specific coder, we basically do not know how it encodes. So
> > far, we have been using the coder instantiated from the Proto, which is
> > basically a LengthPrefixCoder[ByteArrayCoder] or similar[1]. We have had
> > problems with this because the key encoding of Java SDK state requests
> > did not match the key encoding on the Runner side [2]. In an attempt to
> > fix those, it is now partly broken for portable Python pipelines.
> > Partly, because it "only" affects non-standard coders.
> >
> > Non-standard coders yield the aforementioned
> > LengthPrefixCoder[ByteArrayCoder]. Now, following the usual encoding
> > scheme, we would simply encode the key using this coder. However, for
> > state requests, the Python SDK leaves out the length prefix for certain
> > coders, e.g. for primitives like int or byte. It is possible that one
> > coder uses a length prefix, while another doesn't. We have no way of
> > telling from the Runner side, if a length prefix has been used or not.
> > This results in the keys to not match on the Runner side and the
> > partitioning to be broken.
> >
> >
> > How to solve this?
> > ==================
> >
> > (1) Should this simply be fixed on the Python SDK side? One fix would be
> > to always append a length prefix to the key in state requests, even for
> > primitive coders like VarInt which do not use one.
> >
> > OR
> >
> > (2) Should the Runner detect that a non-standard coder is used? If so,
> > it should just pass the bytes from the SDK Harness and never make an
> > attempt to construct a coder based on the Proto.
> >
> >
> > Thinking about it now, it seems pretty obvious that (2) is the most
> > feasible way to avoid complications across all current and future SDKs
> > for key encodings. Still, it is odd that the Proto contains coder
> > information which is not usable.
> >
> > What do you think?
> >
> >
> > Thanks,
> > Max
> >
> >
> > [0] It is possible to restart the pipeline and repartition the
> > checkpointed data.
> > [1]
> >
> https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682
> > [2] https://issues.apache.org/jira/browse/BEAM-8157
>

Reply via email to