On Wed, Nov 6, 2019 at 2:55 AM Maximilian Michels <m...@apache.org> wrote:
>
> Let me try to clarify:
>
> > The Coder used for State/Timers in a StatefulDoFn is pulled out of the
> > input PCollection. If a Runner needs to partition by this coder, it
> > should ensure the coder of this PCollection matches with the Coder
> > used to create the serialized bytes that are used for partitioning
> > (whether or not this is length-prefixed).
>
> That is essentially what I had assumed when I wrote the code. The
> problem is the coder can be "pulled out" in different ways.
>
> For example, let's say we have the following Proto PCollection coder
> with non-standard coder "CustomCoder" as the key coder:
>
>    KvCoder<CustomCoder, VarIntCoder>
>
>  From the Runner side, this currently looks like the following:
>
>    PCol: KvCoder<LengthPrefixCoder<ByteArrayCoder>, VarIntCoder>
>    Key:  LengthPrefixCoder<ByteArrayCoder>

This is I think where the error is. When If the proto references
KvCoder<CustomCoder, VarIntCoder> it should not be pulled out as
KvCoder<LengthPrefixCoder<ByteArrayCoder>, VarIntCoder>; as that
doesn't have the same encoding. Trying to do instantiate such a coder
should be an error. Instead, the runner knows ahead of time that it
will need to instantiate this coder, and should update the bundle
processor to specify KvCoder<LengthPrefixCoder<CustomCoder>,
VarIntCoder> as the coder so both can pull it out in a consistent way.

When the coder is KvCoder<LengthPrefixCoder<CustomCoder>, VarIntCoder>
instantiating it as KvCoder<ByteArrayCoder, VarIntCoder> on the runner
is of course OK as they do have the same encoding.

> At the SDK Harness, we have the coder available:
>
>    PCol: KvCoder<CustomCoder, VarIntCoder>
>    Key:  CustomCoder
>
> Currently, when the SDK Harness serializes a key for a state request,
> the custom coder may happen to add a length prefix, or it may not. It
> depends on the coder used. The correct behavior would be to use the same
> representation as on the Runner side.
>
> > Specifically, "We have no way of telling from the Runner side, if a length 
> > prefix has been used or not." seems false
>
> The Runner cannot inspect an unknown coder, it only has the opaque Proto
> information available which does not allow introspection of non-standard
> coders. With the current state, the Runner may think the coder adds a
> length prefix but the Python SDK worker could choose to add none. This
> produces an inconsistent key encoding. See above.

I think what's being conflated here is "the Coder has been wrapped in
a LengthPrefixCoder" vs. "the coder does length prefixing." These are
two orthogonal concepts. The runner in general only knows the former.

> It looks like the key encoding for state requests on the Python SDK
> Harness side is broken. For transferring elements of a PCollection, the
> coders are obviously working correctly, but for encoding solely the key
> of an element, there is a consistency issue.
>
>
> -Max
>
> On 06.11.19 05:35, Kenneth Knowles wrote:
> > Specifically, "We have no way of telling from the Runner side, if a
> > length prefix has been used or not." seems false. The runner has all the
> > information since length prefix is a model coder. Didn't we agree that
> > all coders should be self-delimiting in runner/SDK interactions,
> > requiring length-prefix only when there is an opaque or dynamic-length
> > value? I assume you mean that at runtime the worker for a given engine
> > does not know?
> >
> > Kenn
> >
> > On Tue, Nov 5, 2019 at 3:19 PM Luke Cwik <lc...@google.com
> > <mailto:lc...@google.com>> wrote:
> >
> >     +1 to what Robert said.
> >
> >     On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw <rober...@google.com
> >     <mailto:rober...@google.com>> wrote:
> >
> >         The Coder used for State/Timers in a StatefulDoFn is pulled out
> >         of the
> >         input PCollection. If a Runner needs to partition by this coder, it
> >         should ensure the coder of this PCollection matches with the Coder
> >         used to create the serialized bytes that are used for partitioning
> >         (whether or not this is length-prefixed).
> >
> >         Concretely, the graph looks like
> >
> >
> >         Runner                          SDK Harness
> >
> >         WriteToGbk
> >              |
> >         ReadFromGbk
> >              |
> >         RunnerMapFn.processKeyValue(key, value)
> >              |
> >              WriteToDataChannel
> >                      ------------------------>
> >                           ReadFromDataChannel
> >                                         |
> >                                     (pcIn)
> >                                         |
> >                            MyStatefulDoFn.process(key, value)
> >
> >         Now the (key part of the) Coder of pcIn, which comes from the proto
> >         that the Runner sent to the SDK, must match the (key part of the)
> >         encoding used in WriteToGbk and ReadFromGbk. If a LenthPrefix is
> >         added
> >         in one spot, it must be added in the other.
> >
> >
> >         [1]
> >         
> > https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183
> >
> >         On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels
> >         <m...@apache.org <mailto:m...@apache.org>> wrote:
> >          >
> >          > Hi,
> >          >
> >          > I wanted to get your opinion on something that I have been
> >         struggling
> >          > with. It is about the coders for state requests in portable
> >         pipelines.
> >          >
> >          > In contrast to "classic" Beam, the Runner is not guaranteed
> >         to know
> >          > which coder is used by the SDK. If the SDK happens to use a
> >         standard
> >          > coder (also known as model coder), we will also have it
> >         available at the
> >          > Runner, i.e. if the Runner is written in one of the SDK
> >         languages (e.g.
> >          > Java). However, when we do not have a standard coder, we just
> >         treat the
> >          > data from the SDK as a blob and just pass it around as bytes.
> >          >
> >          > Problem
> >          > =======
> >          >
> >          > In the case of state requests which the SDK Harness authors
> >         to the
> >          > Runner, we would like for the key associated with the state
> >         request to
> >          > match the key of the element which led to initiating the
> >         state request.
> >          >
> >          > Example:
> >          >
> >          > Runner                 SDK Harness
> >          > ------                 -----------
> >          >
> >          > KV["key","value"]  --> Process Element
> >          >                                |
> >          > LookupState("key") <-- Request state of "key"
> >          >          |
> >          >     State["key"]    --> Receive state
> >          >
> >          >
> >          > For stateful DoFns, the Runner partitions the data based on
> >         the key. In
> >          > Flink, this partitioning must not change during the lifetime of a
> >          > pipeline because the checkpointing otherwise breaks[0]. The
> >         key is
> >          > extracted from the element and stored encoded.
> >          >
> >          > If we have a standard coder, it is basically the same as in the
> >          > "classic" Runner which takes the key and serializes it.
> >         However, when we
> >          > have an SDK-specific coder, we basically do not know how it
> >         encodes. So
> >          > far, we have been using the coder instantiated from the
> >         Proto, which is
> >          > basically a LengthPrefixCoder[ByteArrayCoder] or similar[1].
> >         We have had
> >          > problems with this because the key encoding of Java SDK state
> >         requests
> >          > did not match the key encoding on the Runner side [2]. In an
> >         attempt to
> >          > fix those, it is now partly broken for portable Python pipelines.
> >          > Partly, because it "only" affects non-standard coders.
> >          >
> >          > Non-standard coders yield the aforementioned
> >          > LengthPrefixCoder[ByteArrayCoder]. Now, following the usual
> >         encoding
> >          > scheme, we would simply encode the key using this coder.
> >         However, for
> >          > state requests, the Python SDK leaves out the length prefix
> >         for certain
> >          > coders, e.g. for primitives like int or byte. It is possible
> >         that one
> >          > coder uses a length prefix, while another doesn't. We have no
> >         way of
> >          > telling from the Runner side, if a length prefix has been
> >         used or not.
> >          > This results in the keys to not match on the Runner side and the
> >          > partitioning to be broken.
> >          >
> >          >
> >          > How to solve this?
> >          > ==================
> >          >
> >          > (1) Should this simply be fixed on the Python SDK side? One
> >         fix would be
> >          > to always append a length prefix to the key in state
> >         requests, even for
> >          > primitive coders like VarInt which do not use one.
> >          >
> >          > OR
> >          >
> >          > (2) Should the Runner detect that a non-standard coder is
> >         used? If so,
> >          > it should just pass the bytes from the SDK Harness and never
> >         make an
> >          > attempt to construct a coder based on the Proto.
> >          >
> >          >
> >          > Thinking about it now, it seems pretty obvious that (2) is
> >         the most
> >          > feasible way to avoid complications across all current and
> >         future SDKs
> >          > for key encodings. Still, it is odd that the Proto contains coder
> >          > information which is not usable.
> >          >
> >          > What do you think?
> >          >
> >          >
> >          > Thanks,
> >          > Max
> >          >
> >          >
> >          > [0] It is possible to restart the pipeline and repartition the
> >          > checkpointed data.
> >          > [1]
> >          >
> >         
> > https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682
> >          > [2] https://issues.apache.org/jira/browse/BEAM-8157
> >

Reply via email to