Thanks for the feedback thus far. Some more comments:

Instead, the runner knows ahead of time that it
will need to instantiate this coder, and should update the bundle
processor to specify KvCoder<LengthPrefixCoder<CustomCoder>,
VarIntCoder> as the coder so both can pull it out in a consistent way.

By "update the bundle processor", do you mean modifying the ProcessBundleDescriptor's BagUserState with the correct key coder? Conceptually that is possible, but the current implementation does not allow for this to happen: https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284 It enforces ByteString which does not tell the SDK Harness anything about the desired encoding.

Since the above does not seem feasible, I see the following options:

(1) Modify the pipeline Proto before the translation and wrap a LengthPrefixCoder around non-standard key coders for stateful transforms. This would change the encoding for the entire element, to be sure that the key coder for state requests contains a LengthPrefixCoder for state requests from the SDK Harness. Not optimal.

(2) Add a new method WireCoders#instantiateRunnerWireKeyCoder which returns the correct key coder, i.e. for standard coders, the concrete coder, and for non-standard coders a ByteArrayCoder. We also need to ensure the key encoding on the Runner side is OUTER context, to avoid adding a length prefix to the encoded bytes. Basically, the non-standard coders result in a NOOP coder which does not touch the key bytes.

(3) Patch the Python SDK to ensure non-standard state key coders are always wrapped in a LengthPrefixCoder. That way, we can keep the existing logic on the Runner side.


Option (2) seems like the most practical.

-Max

On 06.11.19 17:26, Robert Bradshaw wrote:
On Wed, Nov 6, 2019 at 2:55 AM Maximilian Michels <m...@apache.org> wrote:

Let me try to clarify:

The Coder used for State/Timers in a StatefulDoFn is pulled out of the
input PCollection. If a Runner needs to partition by this coder, it
should ensure the coder of this PCollection matches with the Coder
used to create the serialized bytes that are used for partitioning
(whether or not this is length-prefixed).

That is essentially what I had assumed when I wrote the code. The
problem is the coder can be "pulled out" in different ways.

For example, let's say we have the following Proto PCollection coder
with non-standard coder "CustomCoder" as the key coder:

    KvCoder<CustomCoder, VarIntCoder>

  From the Runner side, this currently looks like the following:

    PCol: KvCoder<LengthPrefixCoder<ByteArrayCoder>, VarIntCoder>
    Key:  LengthPrefixCoder<ByteArrayCoder>

This is I think where the error is. When If the proto references
KvCoder<CustomCoder, VarIntCoder> it should not be pulled out as
KvCoder<LengthPrefixCoder<ByteArrayCoder>, VarIntCoder>; as that
doesn't have the same encoding. Trying to do instantiate such a coder
should be an error. Instead, the runner knows ahead of time that it
will need to instantiate this coder, and should update the bundle
processor to specify KvCoder<LengthPrefixCoder<CustomCoder>,
VarIntCoder> as the coder so both can pull it out in a consistent way.

When the coder is KvCoder<LengthPrefixCoder<CustomCoder>, VarIntCoder>
instantiating it as KvCoder<ByteArrayCoder, VarIntCoder> on the runner
is of course OK as they do have the same encoding.

At the SDK Harness, we have the coder available:

    PCol: KvCoder<CustomCoder, VarIntCoder>
    Key:  CustomCoder

Currently, when the SDK Harness serializes a key for a state request,
the custom coder may happen to add a length prefix, or it may not. It
depends on the coder used. The correct behavior would be to use the same
representation as on the Runner side.

Specifically, "We have no way of telling from the Runner side, if a length prefix 
has been used or not." seems false

The Runner cannot inspect an unknown coder, it only has the opaque Proto
information available which does not allow introspection of non-standard
coders. With the current state, the Runner may think the coder adds a
length prefix but the Python SDK worker could choose to add none. This
produces an inconsistent key encoding. See above.

I think what's being conflated here is "the Coder has been wrapped in
a LengthPrefixCoder" vs. "the coder does length prefixing." These are
two orthogonal concepts. The runner in general only knows the former.

It looks like the key encoding for state requests on the Python SDK
Harness side is broken. For transferring elements of a PCollection, the
coders are obviously working correctly, but for encoding solely the key
of an element, there is a consistency issue.


-Max

On 06.11.19 05:35, Kenneth Knowles wrote:
Specifically, "We have no way of telling from the Runner side, if a
length prefix has been used or not." seems false. The runner has all the
information since length prefix is a model coder. Didn't we agree that
all coders should be self-delimiting in runner/SDK interactions,
requiring length-prefix only when there is an opaque or dynamic-length
value? I assume you mean that at runtime the worker for a given engine
does not know?

Kenn

On Tue, Nov 5, 2019 at 3:19 PM Luke Cwik <lc...@google.com
<mailto:lc...@google.com>> wrote:

     +1 to what Robert said.

     On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw <rober...@google.com
     <mailto:rober...@google.com>> wrote:

         The Coder used for State/Timers in a StatefulDoFn is pulled out
         of the
         input PCollection. If a Runner needs to partition by this coder, it
         should ensure the coder of this PCollection matches with the Coder
         used to create the serialized bytes that are used for partitioning
         (whether or not this is length-prefixed).

         Concretely, the graph looks like


         Runner                          SDK Harness

         WriteToGbk
              |
         ReadFromGbk
              |
         RunnerMapFn.processKeyValue(key, value)
              |
              WriteToDataChannel
                      ------------------------>
                           ReadFromDataChannel
                                         |
                                     (pcIn)
                                         |
                            MyStatefulDoFn.process(key, value)

         Now the (key part of the) Coder of pcIn, which comes from the proto
         that the Runner sent to the SDK, must match the (key part of the)
         encoding used in WriteToGbk and ReadFromGbk. If a LenthPrefix is
         added
         in one spot, it must be added in the other.


         [1]
         
https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183

         On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels
         <m...@apache.org <mailto:m...@apache.org>> wrote:
          >
          > Hi,
          >
          > I wanted to get your opinion on something that I have been
         struggling
          > with. It is about the coders for state requests in portable
         pipelines.
          >
          > In contrast to "classic" Beam, the Runner is not guaranteed
         to know
          > which coder is used by the SDK. If the SDK happens to use a
         standard
          > coder (also known as model coder), we will also have it
         available at the
          > Runner, i.e. if the Runner is written in one of the SDK
         languages (e.g.
          > Java). However, when we do not have a standard coder, we just
         treat the
          > data from the SDK as a blob and just pass it around as bytes.
          >
          > Problem
          > =======
          >
          > In the case of state requests which the SDK Harness authors
         to the
          > Runner, we would like for the key associated with the state
         request to
          > match the key of the element which led to initiating the
         state request.
          >
          > Example:
          >
          > Runner                 SDK Harness
          > ------                 -----------
          >
          > KV["key","value"]  --> Process Element
          >                                |
          > LookupState("key") <-- Request state of "key"
          >          |
          >     State["key"]    --> Receive state
          >
          >
          > For stateful DoFns, the Runner partitions the data based on
         the key. In
          > Flink, this partitioning must not change during the lifetime of a
          > pipeline because the checkpointing otherwise breaks[0]. The
         key is
          > extracted from the element and stored encoded.
          >
          > If we have a standard coder, it is basically the same as in the
          > "classic" Runner which takes the key and serializes it.
         However, when we
          > have an SDK-specific coder, we basically do not know how it
         encodes. So
          > far, we have been using the coder instantiated from the
         Proto, which is
          > basically a LengthPrefixCoder[ByteArrayCoder] or similar[1].
         We have had
          > problems with this because the key encoding of Java SDK state
         requests
          > did not match the key encoding on the Runner side [2]. In an
         attempt to
          > fix those, it is now partly broken for portable Python pipelines.
          > Partly, because it "only" affects non-standard coders.
          >
          > Non-standard coders yield the aforementioned
          > LengthPrefixCoder[ByteArrayCoder]. Now, following the usual
         encoding
          > scheme, we would simply encode the key using this coder.
         However, for
          > state requests, the Python SDK leaves out the length prefix
         for certain
          > coders, e.g. for primitives like int or byte. It is possible
         that one
          > coder uses a length prefix, while another doesn't. We have no
         way of
          > telling from the Runner side, if a length prefix has been
         used or not.
          > This results in the keys to not match on the Runner side and the
          > partitioning to be broken.
          >
          >
          > How to solve this?
          > ==================
          >
          > (1) Should this simply be fixed on the Python SDK side? One
         fix would be
          > to always append a length prefix to the key in state
         requests, even for
          > primitive coders like VarInt which do not use one.
          >
          > OR
          >
          > (2) Should the Runner detect that a non-standard coder is
         used? If so,
          > it should just pass the bytes from the SDK Harness and never
         make an
          > attempt to construct a coder based on the Proto.
          >
          >
          > Thinking about it now, it seems pretty obvious that (2) is
         the most
          > feasible way to avoid complications across all current and
         future SDKs
          > for key encodings. Still, it is odd that the Proto contains coder
          > information which is not usable.
          >
          > What do you think?
          >
          >
          > Thanks,
          > Max
          >
          >
          > [0] It is possible to restart the pipeline and repartition the
          > checkpointed data.
          > [1]
          >
         
https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682
          > [2] https://issues.apache.org/jira/browse/BEAM-8157

Reply via email to