And by "I wasn't clear" I meant "I misread the options".

On Fri, Nov 8, 2019, 4:14 PM Robert Burke <rob...@frantil.com> wrote:

> Reading back, I wasn't clear: the Go SDK does Option (1), putting the LP
> explicitly during encoding [1] for the runner proto, and explicitly expects
> LPs to contain a custom coder URN on decode for execution [2]. (Modulo an
> old bug in Dataflow where the urn was empty)
>
>
> [1]
> https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L348
> [2]
> https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L219
>
>
> On Fri, Nov 8, 2019, 10:28 AM Robert Bradshaw <rober...@google.com> wrote:
>
>> On Fri, Nov 8, 2019 at 2:09 AM jincheng sun <sunjincheng...@gmail.com>
>> wrote:
>> >
>> > Hi,
>> >
>> > Sorry for my late reply. It seems the conclusion has been reached. I
>> just want to share my personal thoughts.
>> >
>> > Generally, both option 1 and 3 make sense to me.
>> >
>> > >> The key concept here is not "standard coder" but "coder that the
>> > >> runner does not understand." This knowledge is only in the runner.
>> > >> Also has the downside of (2).
>> >
>> > >Yes, I had assumed "non-standard" and "unknown" are the same, but the
>> > >latter can be a subset of the former, i.e. if a Runner does not support
>> > >all of the standard coders for some reason.
>> >
>> > I'm also assume that "non-standard" and "unknown" are the same.
>> Currently, in the runner side[1] it
>> > decides whether the coder is unknown(wrap with length prefix coder)
>> according to whether the coder is among
>> > the standard coders. It will not communicate with harness to make this
>> decision.
>> >
>> > So, from my point of view, we can update the PR according to option 1
>> or 3.
>> >
>> > [1]
>> https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62
>>
>> That list is populated in Java code [1] and has typically been a
>> subset of what is in the proto file. Things like StringUtf8Coder and
>> DoubleCoder have been added at different times to different SDKs and
>> Runners, sometimes long after the URN is in the proto. Having to keep
>> this list synchronized (and versioned) would be a regression.
>>
>> [1]
>> https://github.com/apache/beam/blob/release-2.17.0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
>>
>> The PR taking approach (1) looks good at a first glance (I see others
>> are reviewing it). Thanks.
>>
>> > Maximilian Michels <m...@apache.org> 于2019年11月8日周五 上午3:35写道:
>> >>
>> >> > While the Go SDK doesn't yet support a State API, Option 3) is what
>> the Go SDK does for all non-standard coders (aka custom coders) anyway.
>> >>
>> >> For wire transfer, the Java Runner also adds a LengthPrefixCoder for
>> the
>> >> coder and its subcomponents. The problem is that this is an implicit
>> >> assumption made. In the Proto, we do not have this represented. This is
>> >> why **for state requests**, we end up with a
>> >> "LengthPrefixCoder[CustomCoder]" on the Runner and a "CustomCoder" on
>> >> the SDK Harness side. Note that the Python Harness does wrap unknown
>> >> coders in a LengthPrefixCoder for transferring regular elements, but
>> the
>> >> LengthPrefixCoder is not preserved for the state requests.
>> >>
>> >> In that sense (3) is good because it follows this implicit notion of
>> >> adding a LengthPrefixCoder for wire transfer, but applies it to state
>> >> requests.
>> >>
>> >> However, option (1) is most reliable because the LengthPrefixCoder is
>> >> actually in the Proto. So "CustomCoder" will always be represented as
>> >> "LengthPrefixCoder[CustomCoder]", and only standard coders will be
>> added
>> >> without a LengthPrefixCoder.
>> >>
>> >> > I'd really like to avoid implicit agreements about how the coder that
>> >> > should be used differs from what's specified in the proto in
>> different
>> >> > contexts.
>> >>
>> >> Option (2) would work on top of the existing logic because replacing a
>> >> non-standard coder with a "NOOP coder" would just be used by the Runner
>> >> to produce a serialized version of the key for partitioning. Flink
>> >> always operates on the serialized key, be it standard or non-standard
>> >> coder. It wouldn't be necessary to change any of the existing wire
>> >> transfer logic or representation. I understand that it would be less
>> >> ideal, but maybe easier to fix for the release.
>> >>
>> >> > The key concept here is not "standard coder" but "coder that the
>> >> > runner does not understand." This knowledge is only in the runner.
>> >> > Also has the downside of (2).
>> >>
>> >> Yes, I had assumed "non-standard" and "unknown" are the same, but the
>> >> latter can be a subset of the former, i.e. if a Runner does not support
>> >> all of the standard coders for some reason.
>> >>
>> >> > This means that the wire format that the runner sends for the "key"
>> represents the exact same wire format it will receive for state requests.
>> >>
>> >> The wire format for the entire element is the same. Otherwise we
>> >> wouldn't be able to process data between the Runner and the SDK
>> Harness.
>> >> However, the problem is that the way the Runner instantiates the key
>> >> coder to partition elements, does not match how the SDK encodes the key
>> >> when it sends a state request to the Runner. Conceptually, those two
>> >> situations should be the same, but in practice they are not.
>> >>
>> >>
>> >> Now that I thought about it again option (1) is probably the most
>> >> explicit and in that sense cleanest. However, option (3) is kind of
>> fair
>> >> because it would just replicate the implicit LengthPrefixCoder behavior
>> >> we have for general wire transfer also for state requests. Option (2) I
>> >> suppose is the most implicit and runner-specific, should probably be
>> >> avoided in the long run.
>> >>
>> >> So I'd probably opt for (1) and I would update the PR[1] rather soon
>> >> because this currently blocks the release, as this is a regression from
>> >> 2.16.0.[2]
>> >>
>> >>
>> >> -Max
>> >>
>> >> [1] https://github.com/apache/beam/pull/9997
>> >> [2] (In 2.16.0 it worked for Python because the Runner used a
>> >> ByteArrayCoder with the OUTER encoding context for the key which was
>> >> basically option (2). Only problem that, for standard coders the Java
>> >> SDK Harness produced non-matching state request keys, due to it using
>> >> the NESTED context.)
>> >>
>> >> On 07.11.19 18:01, Luke Cwik wrote:
>> >> >
>> >> >
>> >> > On Thu, Nov 7, 2019 at 8:22 AM Robert Bradshaw <rober...@google.com
>> >> > <mailto:rober...@google.com>> wrote:
>> >> >
>> >> >     On Thu, Nov 7, 2019 at 6:26 AM Maximilian Michels <
>> m...@apache.org
>> >> >     <mailto:m...@apache.org>> wrote:
>> >> >      >
>> >> >      > Thanks for the feedback thus far. Some more comments:
>> >> >      >
>> >> >      > > Instead, the runner knows ahead of time that it
>> >> >      > > will need to instantiate this coder, and should update the
>> bundle
>> >> >      > > processor to specify KvCoder<LengthPrefixCoder<CustomCoder>,
>> >> >      > > VarIntCoder> as the coder so both can pull it out in a
>> >> >     consistent way.
>> >> >      >
>> >> >      > By "update the bundle processor", do you mean modifying the
>> >> >      > ProcessBundleDescriptor's BagUserState with the correct key
>> coder?
>> >> >      > Conceptually that is possible, but the current implementation
>> >> >     does not
>> >> >      > allow for this to happen:
>> >> >      >
>> >> >
>> https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284
>> >> >      > It enforces ByteString which does not tell the SDK Harness
>> anything
>> >> >      > about the desired encoding.
>> >> >
>> >> >     I meant update the BundleProcessDescriptor proto that is sent to
>> the
>> >> >     SDK
>> >> >
>> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L140
>> ,
>> >> >     essentially option (1).
>> >> >
>> >> >
>> >> > For clarity, the "key" coder is specified by the stateful ParDo's
>> main
>> >> > input PCollection. This means that the ProcessBundleDescriptor should
>> >> > have something that has the length prefix as part of the remote grpc
>> >> > port specification AND the PCollection that follows it which is the
>> main
>> >> > input for the stateful ParDo. This means that the wire format that
>> the
>> >> > runner sends for the "key" represents the exact same wire format it
>> will
>> >> > receive for state requests.
>> >> >
>> >> > I see what you mean Max,
>> >> >
>> https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284
>> >> > could change to represent the actual coder (whether it be
>> >> > LengthPrefixCoder<BytesCoder> or some other model coder combination).
>> >> > Currently that logic assumes that the runner can perform the
>> backwards
>> >> > mapping by decoding the bytestring with the appropriate coder.
>> >> >
>> >> >      > Since the above does not seem feasible, I see the following
>> options:
>> >> >      >
>> >> >      > (1) Modify the pipeline Proto before the translation and wrap
>> a
>> >> >      > LengthPrefixCoder around non-standard key coders for stateful
>> >> >      > transforms. This would change the encoding for the entire
>> >> >     element, to be
>> >> >      > sure that the key coder for state requests contains a
>> >> >     LengthPrefixCoder
>> >> >      > for state requests from the SDK Harness. Not optimal.
>> >> >
>> >> >     Yes. The contract should be that both the runner and SDK use the
>> >> >     coders that are specified in the proto. The runner controls the
>> proto,
>> >> >     and should ensure it only sends protos it will be able to handle
>> the
>> >> >     SDK responding to. I'm not seeing why this is sub-optimal.
>> >> >
>> >> >      > (2) Add a new method WireCoders#instantiateRunnerWireKeyCoder
>> which
>> >> >      > returns the correct key coder, i.e. for standard coders, the
>> concrete
>> >> >      > coder, and for non-standard coders a ByteArrayCoder. We also
>> need to
>> >> >      > ensure the key encoding on the Runner side is OUTER context,
>> to avoid
>> >> >      > adding a length prefix to the encoded bytes. Basically, the
>> >> >     non-standard
>> >> >      > coders result in a NOOP coder which does not touch the key
>> bytes.
>> >> >
>> >> >     I'd really like to avoid implicit agreements about how the coder
>> that
>> >> >     should be used differs from what's specified in the proto in
>> different
>> >> >     contexts.
>> >> >
>> >> >      > (3) Patch the Python SDK to ensure non-standard state key
>> coders are
>> >> >      > always wrapped in a LengthPrefixCoder. That way, we can keep
>> the
>> >> >      > existing logic on the Runner side.
>> >> >
>> >> >     The key concept here is not "standard coder" but "coder that the
>> >> >     runner does not understand." This knowledge is only in the
>> runner.
>> >> >     Also has the downside of (2).
>> >> >
>> >> >      > Option (2) seems like the most practical.
>> >> >      >
>> >> >      > -Max
>> >> >      >
>> >> >      > On 06.11.19 17:26, Robert Bradshaw wrote:
>> >> >      > > On Wed, Nov 6, 2019 at 2:55 AM Maximilian Michels
>> >> >     <m...@apache.org <mailto:m...@apache.org>> wrote:
>> >> >      > >>
>> >> >      > >> Let me try to clarify:
>> >> >      > >>
>> >> >      > >>> The Coder used for State/Timers in a StatefulDoFn is
>> pulled
>> >> >     out of the
>> >> >      > >>> input PCollection. If a Runner needs to partition by this
>> >> >     coder, it
>> >> >      > >>> should ensure the coder of this PCollection matches with
>> the
>> >> >     Coder
>> >> >      > >>> used to create the serialized bytes that are used for
>> >> >     partitioning
>> >> >      > >>> (whether or not this is length-prefixed).
>> >> >      > >>
>> >> >      > >> That is essentially what I had assumed when I wrote the
>> code. The
>> >> >      > >> problem is the coder can be "pulled out" in different ways.
>> >> >      > >>
>> >> >      > >> For example, let's say we have the following Proto
>> PCollection
>> >> >     coder
>> >> >      > >> with non-standard coder "CustomCoder" as the key coder:
>> >> >      > >>
>> >> >      > >>     KvCoder<CustomCoder, VarIntCoder>
>> >> >      > >>
>> >> >      > >>   From the Runner side, this currently looks like the
>> following:
>> >> >      > >>
>> >> >      > >>     PCol: KvCoder<LengthPrefixCoder<ByteArrayCoder>,
>> VarIntCoder>
>> >> >      > >>     Key:  LengthPrefixCoder<ByteArrayCoder>
>> >> >      > >
>> >> >      > > This is I think where the error is. When If the proto
>> references
>> >> >      > > KvCoder<CustomCoder, VarIntCoder> it should not be pulled
>> out as
>> >> >      > > KvCoder<LengthPrefixCoder<ByteArrayCoder>, VarIntCoder>; as
>> that
>> >> >      > > doesn't have the same encoding. Trying to do instantiate
>> such a
>> >> >     coder
>> >> >      > > should be an error. Instead, the runner knows ahead of time
>> that it
>> >> >      > > will need to instantiate this coder, and should update the
>> bundle
>> >> >      > > processor to specify KvCoder<LengthPrefixCoder<CustomCoder>,
>> >> >      > > VarIntCoder> as the coder so both can pull it out in a
>> >> >     consistent way.
>> >> >      > >
>> >> >      > > When the coder is KvCoder<LengthPrefixCoder<CustomCoder>,
>> >> >     VarIntCoder>
>> >> >      > > instantiating it as KvCoder<ByteArrayCoder, VarIntCoder> on
>> the
>> >> >     runner
>> >> >      > > is of course OK as they do have the same encoding.
>> >> >      > >
>> >> >      > >> At the SDK Harness, we have the coder available:
>> >> >      > >>
>> >> >      > >>     PCol: KvCoder<CustomCoder, VarIntCoder>
>> >> >      > >>     Key:  CustomCoder
>> >> >      > >>
>> >> >      > >> Currently, when the SDK Harness serializes a key for a
>> state
>> >> >     request,
>> >> >      > >> the custom coder may happen to add a length prefix, or it
>> may
>> >> >     not. It
>> >> >      > >> depends on the coder used. The correct behavior would be to
>> >> >     use the same
>> >> >      > >> representation as on the Runner side.
>> >> >      > >>
>> >> >      > >>> Specifically, "We have no way of telling from the Runner
>> >> >     side, if a length prefix has been used or not." seems false
>> >> >      > >>
>> >> >      > >> The Runner cannot inspect an unknown coder, it only has the
>> >> >     opaque Proto
>> >> >      > >> information available which does not allow introspection of
>> >> >     non-standard
>> >> >      > >> coders. With the current state, the Runner may think the
>> coder
>> >> >     adds a
>> >> >      > >> length prefix but the Python SDK worker could choose to add
>> >> >     none. This
>> >> >      > >> produces an inconsistent key encoding. See above.
>> >> >      > >
>> >> >      > > I think what's being conflated here is "the Coder has been
>> >> >     wrapped in
>> >> >      > > a LengthPrefixCoder" vs. "the coder does length prefixing."
>> >> >     These are
>> >> >      > > two orthogonal concepts. The runner in general only knows
>> the
>> >> >     former.
>> >> >      > >
>> >> >      > >> It looks like the key encoding for state requests on the
>> >> >     Python SDK
>> >> >      > >> Harness side is broken. For transferring elements of a
>> >> >     PCollection, the
>> >> >      > >> coders are obviously working correctly, but for encoding
>> >> >     solely the key
>> >> >      > >> of an element, there is a consistency issue.
>> >> >      > >>
>> >> >      > >>
>> >> >      > >> -Max
>> >> >      > >>
>> >> >      > >> On 06.11.19 05:35, Kenneth Knowles wrote:
>> >> >      > >>> Specifically, "We have no way of telling from the Runner
>> >> >     side, if a
>> >> >      > >>> length prefix has been used or not." seems false. The
>> runner
>> >> >     has all the
>> >> >      > >>> information since length prefix is a model coder. Didn't
>> we
>> >> >     agree that
>> >> >      > >>> all coders should be self-delimiting in runner/SDK
>> interactions,
>> >> >      > >>> requiring length-prefix only when there is an opaque or
>> >> >     dynamic-length
>> >> >      > >>> value? I assume you mean that at runtime the worker for a
>> >> >     given engine
>> >> >      > >>> does not know?
>> >> >      > >>>
>> >> >      > >>> Kenn
>> >> >      > >>>
>> >> >      > >>> On Tue, Nov 5, 2019 at 3:19 PM Luke Cwik <
>> lc...@google.com
>> >> >     <mailto:lc...@google.com>
>> >> >      > >>> <mailto:lc...@google.com <mailto:lc...@google.com>>>
>> wrote:
>> >> >      > >>>
>> >> >      > >>>      +1 to what Robert said.
>> >> >      > >>>
>> >> >      > >>>      On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw
>> >> >     <rober...@google.com <mailto:rober...@google.com>
>> >> >      > >>>      <mailto:rober...@google.com
>> >> >     <mailto:rober...@google.com>>> wrote:
>> >> >      > >>>
>> >> >      > >>>          The Coder used for State/Timers in a
>> StatefulDoFn is
>> >> >     pulled out
>> >> >      > >>>          of the
>> >> >      > >>>          input PCollection. If a Runner needs to
>> partition by
>> >> >     this coder, it
>> >> >      > >>>          should ensure the coder of this PCollection
>> matches
>> >> >     with the Coder
>> >> >      > >>>          used to create the serialized bytes that are used
>> >> >     for partitioning
>> >> >      > >>>          (whether or not this is length-prefixed).
>> >> >      > >>>
>> >> >      > >>>          Concretely, the graph looks like
>> >> >      > >>>
>> >> >      > >>>
>> >> >      > >>>          Runner                          SDK Harness
>> >> >      > >>>
>> >> >      > >>>          WriteToGbk
>> >> >      > >>>               |
>> >> >      > >>>          ReadFromGbk
>> >> >      > >>>               |
>> >> >      > >>>          RunnerMapFn.processKeyValue(key, value)
>> >> >      > >>>               |
>> >> >      > >>>               WriteToDataChannel
>> >> >      > >>>                       ------------------------>
>> >> >      > >>>                            ReadFromDataChannel
>> >> >      > >>>                                          |
>> >> >      > >>>                                      (pcIn)
>> >> >      > >>>                                          |
>> >> >      > >>>                             MyStatefulDoFn.process(key,
>> value)
>> >> >      > >>>
>> >> >      > >>>          Now the (key part of the) Coder of pcIn, which
>> comes
>> >> >     from the proto
>> >> >      > >>>          that the Runner sent to the SDK, must match the
>> (key
>> >> >     part of the)
>> >> >      > >>>          encoding used in WriteToGbk and ReadFromGbk. If a
>> >> >     LenthPrefix is
>> >> >      > >>>          added
>> >> >      > >>>          in one spot, it must be added in the other.
>> >> >      > >>>
>> >> >      > >>>
>> >> >      > >>>          [1]
>> >> >      > >>>
>> >> >
>> https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183
>> >> >      > >>>
>> >> >      > >>>          On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels
>> >> >      > >>>          <m...@apache.org <mailto:m...@apache.org>
>> >> >     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>> >> >      > >>>           >
>> >> >      > >>>           > Hi,
>> >> >      > >>>           >
>> >> >      > >>>           > I wanted to get your opinion on something
>> that I
>> >> >     have been
>> >> >      > >>>          struggling
>> >> >      > >>>           > with. It is about the coders for state
>> requests
>> >> >     in portable
>> >> >      > >>>          pipelines.
>> >> >      > >>>           >
>> >> >      > >>>           > In contrast to "classic" Beam, the Runner is
>> not
>> >> >     guaranteed
>> >> >      > >>>          to know
>> >> >      > >>>           > which coder is used by the SDK. If the SDK
>> >> >     happens to use a
>> >> >      > >>>          standard
>> >> >      > >>>           > coder (also known as model coder), we will
>> also
>> >> >     have it
>> >> >      > >>>          available at the
>> >> >      > >>>           > Runner, i.e. if the Runner is written in one
>> of
>> >> >     the SDK
>> >> >      > >>>          languages (e.g.
>> >> >      > >>>           > Java). However, when we do not have a standard
>> >> >     coder, we just
>> >> >      > >>>          treat the
>> >> >      > >>>           > data from the SDK as a blob and just pass it
>> >> >     around as bytes.
>> >> >      > >>>           >
>> >> >      > >>>           > Problem
>> >> >      > >>>           > =======
>> >> >      > >>>           >
>> >> >      > >>>           > In the case of state requests which the SDK
>> >> >     Harness authors
>> >> >      > >>>          to the
>> >> >      > >>>           > Runner, we would like for the key associated
>> with
>> >> >     the state
>> >> >      > >>>          request to
>> >> >      > >>>           > match the key of the element which led to
>> >> >     initiating the
>> >> >      > >>>          state request.
>> >> >      > >>>           >
>> >> >      > >>>           > Example:
>> >> >      > >>>           >
>> >> >      > >>>           > Runner                 SDK Harness
>> >> >      > >>>           > ------                 -----------
>> >> >      > >>>           >
>> >> >      > >>>           > KV["key","value"]  --> Process Element
>> >> >      > >>>           >                                |
>> >> >      > >>>           > LookupState("key") <-- Request state of "key"
>> >> >      > >>>           >          |
>> >> >      > >>>           >     State["key"]    --> Receive state
>> >> >      > >>>           >
>> >> >      > >>>           >
>> >> >      > >>>           > For stateful DoFns, the Runner partitions the
>> >> >     data based on
>> >> >      > >>>          the key. In
>> >> >      > >>>           > Flink, this partitioning must not change
>> during
>> >> >     the lifetime of a
>> >> >      > >>>           > pipeline because the checkpointing otherwise
>> >> >     breaks[0]. The
>> >> >      > >>>          key is
>> >> >      > >>>           > extracted from the element and stored encoded.
>> >> >      > >>>           >
>> >> >      > >>>           > If we have a standard coder, it is basically
>> the
>> >> >     same as in the
>> >> >      > >>>           > "classic" Runner which takes the key and
>> >> >     serializes it.
>> >> >      > >>>          However, when we
>> >> >      > >>>           > have an SDK-specific coder, we basically do
>> not
>> >> >     know how it
>> >> >      > >>>          encodes. So
>> >> >      > >>>           > far, we have been using the coder instantiated
>> >> >     from the
>> >> >      > >>>          Proto, which is
>> >> >      > >>>           > basically a LengthPrefixCoder[ByteArrayCoder]
>> or
>> >> >     similar[1].
>> >> >      > >>>          We have had
>> >> >      > >>>           > problems with this because the key encoding of
>> >> >     Java SDK state
>> >> >      > >>>          requests
>> >> >      > >>>           > did not match the key encoding on the Runner
>> side
>> >> >     [2]. In an
>> >> >      > >>>          attempt to
>> >> >      > >>>           > fix those, it is now partly broken for
>> portable
>> >> >     Python pipelines.
>> >> >      > >>>           > Partly, because it "only" affects non-standard
>> >> >     coders.
>> >> >      > >>>           >
>> >> >      > >>>           > Non-standard coders yield the aforementioned
>> >> >      > >>>           > LengthPrefixCoder[ByteArrayCoder]. Now,
>> following
>> >> >     the usual
>> >> >      > >>>          encoding
>> >> >      > >>>           > scheme, we would simply encode the key using
>> this
>> >> >     coder.
>> >> >      > >>>          However, for
>> >> >      > >>>           > state requests, the Python SDK leaves out the
>> >> >     length prefix
>> >> >      > >>>          for certain
>> >> >      > >>>           > coders, e.g. for primitives like int or byte.
>> It
>> >> >     is possible
>> >> >      > >>>          that one
>> >> >      > >>>           > coder uses a length prefix, while another
>> >> >     doesn't. We have no
>> >> >      > >>>          way of
>> >> >      > >>>           > telling from the Runner side, if a length
>> prefix
>> >> >     has been
>> >> >      > >>>          used or not.
>> >> >      > >>>           > This results in the keys to not match on the
>> >> >     Runner side and the
>> >> >      > >>>           > partitioning to be broken.
>> >> >      > >>>           >
>> >> >      > >>>           >
>> >> >      > >>>           > How to solve this?
>> >> >      > >>>           > ==================
>> >> >      > >>>           >
>> >> >      > >>>           > (1) Should this simply be fixed on the Python
>> SDK
>> >> >     side? One
>> >> >      > >>>          fix would be
>> >> >      > >>>           > to always append a length prefix to the key
>> in state
>> >> >      > >>>          requests, even for
>> >> >      > >>>           > primitive coders like VarInt which do not use
>> one.
>> >> >      > >>>           >
>> >> >      > >>>           > OR
>> >> >      > >>>           >
>> >> >      > >>>           > (2) Should the Runner detect that a
>> non-standard
>> >> >     coder is
>> >> >      > >>>          used? If so,
>> >> >      > >>>           > it should just pass the bytes from the SDK
>> >> >     Harness and never
>> >> >      > >>>          make an
>> >> >      > >>>           > attempt to construct a coder based on the
>> Proto.
>> >> >      > >>>           >
>> >> >      > >>>           >
>> >> >      > >>>           > Thinking about it now, it seems pretty obvious
>> >> >     that (2) is
>> >> >      > >>>          the most
>> >> >      > >>>           > feasible way to avoid complications across all
>> >> >     current and
>> >> >      > >>>          future SDKs
>> >> >      > >>>           > for key encodings. Still, it is odd that the
>> >> >     Proto contains coder
>> >> >      > >>>           > information which is not usable.
>> >> >      > >>>           >
>> >> >      > >>>           > What do you think?
>> >> >      > >>>           >
>> >> >      > >>>           >
>> >> >      > >>>           > Thanks,
>> >> >      > >>>           > Max
>> >> >      > >>>           >
>> >> >      > >>>           >
>> >> >      > >>>           > [0] It is possible to restart the pipeline and
>> >> >     repartition the
>> >> >      > >>>           > checkpointed data.
>> >> >      > >>>           > [1]
>> >> >      > >>>           >
>> >> >      > >>>
>> >> >
>> https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682
>> >> >      > >>>           > [2]
>> https://issues.apache.org/jira/browse/BEAM-8157
>> >> >      > >>>
>> >> >
>>
>

Reply via email to