On Fri, Nov 8, 2019 at 10:04 PM jincheng sun <sunjincheng...@gmail.com> wrote:
>
> > Let us first define what are "standard coders". Usually it should be the 
> > coders defined in the Proto. However, personally I think the coders defined 
> > in the Java ModelCoders [1] seems more appropriate. The reason is that for 
> > a coder which has already appeared in Proto and still not added to the Java 
> > ModelCoders, it's always replaced by the runner with 
> > LengthPrefixCoder[ByteArrayCoder] and so the SDK harness will decode the 
> > data with LengthPrefixCoder[ByteArrayCoder] instead of the actual coder. 
> > That's to say, that coder does not still take effect in the SDK harness. 
> > Only when the coder is added in ModelCoders, it's 'known' and will take 
> > effect.
>
> Correct this point! The coder which is not contained in the Java ModelCoders 
> is replaced with LengthPrefixCoder[ByteArrayCoder] at runner side and 
> LengthPrefixCoder[CustomCoder] at SDK harness side.
>
> The point here is that the runner determines whether it knows the coder 
> according to the coders defined in the Java ModelCoders, not the coders 
> defined in the proto file. So if taking option 3, the non-standard coders 
> which will be wrapped with LengthPrefixCoder should also be determined by the 
> coders defined in the Java ModerCoders, not the coders defined in the proto 
> file.

Yes.

Both as a matter of principle and pragmatics, it'd be good to avoid
anything about the model only defined in Java files.

Also, when we say "the runner" we cannot assume it's written in Java.
While many Java OSS runners share these libraries, The Universal Local
Runner is written in Python. Dataflow is written (primarily) in C++.
My hope is that the FnAPI will be stable enough that one can even run
multiple versions of the Java SDK with the same runner. What matters
is that (1) if the same URN is used, all runners/SDKs agree on the
encoding (2) there are certain coders (Windowed, LengthPrefixed, and
KV come to mind) that all Runners/SDKs are required to understand, and
(3) runners properly coerce coders they do not understand into coders
that they do if they need to pull out and act on the bytes. The more
coders the runner/SDK understands, the less often it needs to do this.

> jincheng sun <sunjincheng...@gmail.com>于2019年11月9日 周六12:26写道:
>>
>> Hi Robert Bradshaw,
>>
>> Thanks a lot for the explanation. Very interesting topic!
>>
>> Let us first define what are "standard coders". Usually it should be the 
>> coders defined in the Proto. However, personally I think the coders defined 
>> in the Java ModelCoders [1] seems more appropriate. The reason is that for a 
>> coder which has already appeared in Proto and still not added to the Java 
>> ModelCoders, it's always replaced by the runner with 
>> LengthPrefixCoder[ByteArrayCoder] and so the SDK harness will decode the 
>> data with LengthPrefixCoder[ByteArrayCoder] instead of the actual coder. 
>> That's to say, that coder does not still take effect in the SDK harness. 
>> Only when the coder is added in ModelCoders, it's 'known' and will take 
>> effect.
>>
>> So if we take option 3, the non-standard coders which will be wrapped with 
>> LengthPrefixCoder should be synced with the coders defined in the Java 
>> ModerCoders. (From this point of view, option 1 seems more clean!)
>>
>> Please correct me if I missed something. Thanks a lot!
>>
>> Best,
>> Jincheng
>>
>> [1] 
>> https://github.com/apache/beam/blob/01726e9c62313749f9ea7c93063a1178abd1a8db/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java#L59
>>
>> Robert Burke <rob...@frantil.com> 于2019年11月9日周六 上午8:46写道:
>>>
>>> And by "I wasn't clear" I meant "I misread the options".
>>>
>>> On Fri, Nov 8, 2019, 4:14 PM Robert Burke <rob...@frantil.com> wrote:
>>>>
>>>> Reading back, I wasn't clear: the Go SDK does Option (1), putting the LP 
>>>> explicitly during encoding [1] for the runner proto, and explicitly 
>>>> expects LPs to contain a custom coder URN on decode for execution [2]. 
>>>> (Modulo an old bug in Dataflow where the urn was empty)
>>>>
>>>>
>>>> [1] 
>>>> https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L348
>>>> [2] 
>>>> https://github.com/apache/beam/blob/4364a214dfe6d8d5dd84b1bb91d579f466492ca5/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L219
>>>>
>>>>
>>>> On Fri, Nov 8, 2019, 10:28 AM Robert Bradshaw <rober...@google.com> wrote:
>>>>>
>>>>> On Fri, Nov 8, 2019 at 2:09 AM jincheng sun <sunjincheng...@gmail.com> 
>>>>> wrote:
>>>>> >
>>>>> > Hi,
>>>>> >
>>>>> > Sorry for my late reply. It seems the conclusion has been reached. I 
>>>>> > just want to share my personal thoughts.
>>>>> >
>>>>> > Generally, both option 1 and 3 make sense to me.
>>>>> >
>>>>> > >> The key concept here is not "standard coder" but "coder that the
>>>>> > >> runner does not understand." This knowledge is only in the runner.
>>>>> > >> Also has the downside of (2).
>>>>> >
>>>>> > >Yes, I had assumed "non-standard" and "unknown" are the same, but the
>>>>> > >latter can be a subset of the former, i.e. if a Runner does not support
>>>>> > >all of the standard coders for some reason.
>>>>> >
>>>>> > I'm also assume that "non-standard" and "unknown" are the same. 
>>>>> > Currently, in the runner side[1] it
>>>>> > decides whether the coder is unknown(wrap with length prefix coder) 
>>>>> > according to whether the coder is among
>>>>> > the standard coders. It will not communicate with harness to make this 
>>>>> > decision.
>>>>> >
>>>>> > So, from my point of view, we can update the PR according to option 1 
>>>>> > or 3.
>>>>> >
>>>>> > [1] 
>>>>> > https://github.com/apache/beam/blob/66a67e6580b93906038b31ae7070204cec90999c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62
>>>>>
>>>>> That list is populated in Java code [1] and has typically been a
>>>>> subset of what is in the proto file. Things like StringUtf8Coder and
>>>>> DoubleCoder have been added at different times to different SDKs and
>>>>> Runners, sometimes long after the URN is in the proto. Having to keep
>>>>> this list synchronized (and versioned) would be a regression.
>>>>>
>>>>> [1] 
>>>>> https://github.com/apache/beam/blob/release-2.17.0/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
>>>>>
>>>>> The PR taking approach (1) looks good at a first glance (I see others
>>>>> are reviewing it). Thanks.
>>>>>
>>>>> > Maximilian Michels <m...@apache.org> 于2019年11月8日周五 上午3:35写道:
>>>>> >>
>>>>> >> > While the Go SDK doesn't yet support a State API, Option 3) is what 
>>>>> >> > the Go SDK does for all non-standard coders (aka custom coders) 
>>>>> >> > anyway.
>>>>> >>
>>>>> >> For wire transfer, the Java Runner also adds a LengthPrefixCoder for 
>>>>> >> the
>>>>> >> coder and its subcomponents. The problem is that this is an implicit
>>>>> >> assumption made. In the Proto, we do not have this represented. This is
>>>>> >> why **for state requests**, we end up with a
>>>>> >> "LengthPrefixCoder[CustomCoder]" on the Runner and a "CustomCoder" on
>>>>> >> the SDK Harness side. Note that the Python Harness does wrap unknown
>>>>> >> coders in a LengthPrefixCoder for transferring regular elements, but 
>>>>> >> the
>>>>> >> LengthPrefixCoder is not preserved for the state requests.
>>>>> >>
>>>>> >> In that sense (3) is good because it follows this implicit notion of
>>>>> >> adding a LengthPrefixCoder for wire transfer, but applies it to state
>>>>> >> requests.
>>>>> >>
>>>>> >> However, option (1) is most reliable because the LengthPrefixCoder is
>>>>> >> actually in the Proto. So "CustomCoder" will always be represented as
>>>>> >> "LengthPrefixCoder[CustomCoder]", and only standard coders will be 
>>>>> >> added
>>>>> >> without a LengthPrefixCoder.
>>>>> >>
>>>>> >> > I'd really like to avoid implicit agreements about how the coder that
>>>>> >> > should be used differs from what's specified in the proto in 
>>>>> >> > different
>>>>> >> > contexts.
>>>>> >>
>>>>> >> Option (2) would work on top of the existing logic because replacing a
>>>>> >> non-standard coder with a "NOOP coder" would just be used by the Runner
>>>>> >> to produce a serialized version of the key for partitioning. Flink
>>>>> >> always operates on the serialized key, be it standard or non-standard
>>>>> >> coder. It wouldn't be necessary to change any of the existing wire
>>>>> >> transfer logic or representation. I understand that it would be less
>>>>> >> ideal, but maybe easier to fix for the release.
>>>>> >>
>>>>> >> > The key concept here is not "standard coder" but "coder that the
>>>>> >> > runner does not understand." This knowledge is only in the runner.
>>>>> >> > Also has the downside of (2).
>>>>> >>
>>>>> >> Yes, I had assumed "non-standard" and "unknown" are the same, but the
>>>>> >> latter can be a subset of the former, i.e. if a Runner does not support
>>>>> >> all of the standard coders for some reason.
>>>>> >>
>>>>> >> > This means that the wire format that the runner sends for the "key" 
>>>>> >> > represents the exact same wire format it will receive for state 
>>>>> >> > requests.
>>>>> >>
>>>>> >> The wire format for the entire element is the same. Otherwise we
>>>>> >> wouldn't be able to process data between the Runner and the SDK 
>>>>> >> Harness.
>>>>> >> However, the problem is that the way the Runner instantiates the key
>>>>> >> coder to partition elements, does not match how the SDK encodes the key
>>>>> >> when it sends a state request to the Runner. Conceptually, those two
>>>>> >> situations should be the same, but in practice they are not.
>>>>> >>
>>>>> >>
>>>>> >> Now that I thought about it again option (1) is probably the most
>>>>> >> explicit and in that sense cleanest. However, option (3) is kind of 
>>>>> >> fair
>>>>> >> because it would just replicate the implicit LengthPrefixCoder behavior
>>>>> >> we have for general wire transfer also for state requests. Option (2) I
>>>>> >> suppose is the most implicit and runner-specific, should probably be
>>>>> >> avoided in the long run.
>>>>> >>
>>>>> >> So I'd probably opt for (1) and I would update the PR[1] rather soon
>>>>> >> because this currently blocks the release, as this is a regression from
>>>>> >> 2.16.0.[2]
>>>>> >>
>>>>> >>
>>>>> >> -Max
>>>>> >>
>>>>> >> [1] https://github.com/apache/beam/pull/9997
>>>>> >> [2] (In 2.16.0 it worked for Python because the Runner used a
>>>>> >> ByteArrayCoder with the OUTER encoding context for the key which was
>>>>> >> basically option (2). Only problem that, for standard coders the Java
>>>>> >> SDK Harness produced non-matching state request keys, due to it using
>>>>> >> the NESTED context.)
>>>>> >>
>>>>> >> On 07.11.19 18:01, Luke Cwik wrote:
>>>>> >> >
>>>>> >> >
>>>>> >> > On Thu, Nov 7, 2019 at 8:22 AM Robert Bradshaw <rober...@google.com
>>>>> >> > <mailto:rober...@google.com>> wrote:
>>>>> >> >
>>>>> >> >     On Thu, Nov 7, 2019 at 6:26 AM Maximilian Michels 
>>>>> >> > <m...@apache.org
>>>>> >> >     <mailto:m...@apache.org>> wrote:
>>>>> >> >      >
>>>>> >> >      > Thanks for the feedback thus far. Some more comments:
>>>>> >> >      >
>>>>> >> >      > > Instead, the runner knows ahead of time that it
>>>>> >> >      > > will need to instantiate this coder, and should update the 
>>>>> >> > bundle
>>>>> >> >      > > processor to specify KvCoder<LengthPrefixCoder<CustomCoder>,
>>>>> >> >      > > VarIntCoder> as the coder so both can pull it out in a
>>>>> >> >     consistent way.
>>>>> >> >      >
>>>>> >> >      > By "update the bundle processor", do you mean modifying the
>>>>> >> >      > ProcessBundleDescriptor's BagUserState with the correct key 
>>>>> >> > coder?
>>>>> >> >      > Conceptually that is possible, but the current implementation
>>>>> >> >     does not
>>>>> >> >      > allow for this to happen:
>>>>> >> >      >
>>>>> >> >     
>>>>> >> > https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284
>>>>> >> >      > It enforces ByteString which does not tell the SDK Harness 
>>>>> >> > anything
>>>>> >> >      > about the desired encoding.
>>>>> >> >
>>>>> >> >     I meant update the BundleProcessDescriptor proto that is sent to 
>>>>> >> > the
>>>>> >> >     SDK
>>>>> >> >     
>>>>> >> > https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L140,
>>>>> >> >     essentially option (1).
>>>>> >> >
>>>>> >> >
>>>>> >> > For clarity, the "key" coder is specified by the stateful ParDo's 
>>>>> >> > main
>>>>> >> > input PCollection. This means that the ProcessBundleDescriptor should
>>>>> >> > have something that has the length prefix as part of the remote grpc
>>>>> >> > port specification AND the PCollection that follows it which is the 
>>>>> >> > main
>>>>> >> > input for the stateful ParDo. This means that the wire format that 
>>>>> >> > the
>>>>> >> > runner sends for the "key" represents the exact same wire format it 
>>>>> >> > will
>>>>> >> > receive for state requests.
>>>>> >> >
>>>>> >> > I see what you mean Max,
>>>>> >> > https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284
>>>>> >> > could change to represent the actual coder (whether it be
>>>>> >> > LengthPrefixCoder<BytesCoder> or some other model coder combination).
>>>>> >> > Currently that logic assumes that the runner can perform the 
>>>>> >> > backwards
>>>>> >> > mapping by decoding the bytestring with the appropriate coder.
>>>>> >> >
>>>>> >> >      > Since the above does not seem feasible, I see the following 
>>>>> >> > options:
>>>>> >> >      >
>>>>> >> >      > (1) Modify the pipeline Proto before the translation and wrap 
>>>>> >> > a
>>>>> >> >      > LengthPrefixCoder around non-standard key coders for stateful
>>>>> >> >      > transforms. This would change the encoding for the entire
>>>>> >> >     element, to be
>>>>> >> >      > sure that the key coder for state requests contains a
>>>>> >> >     LengthPrefixCoder
>>>>> >> >      > for state requests from the SDK Harness. Not optimal.
>>>>> >> >
>>>>> >> >     Yes. The contract should be that both the runner and SDK use the
>>>>> >> >     coders that are specified in the proto. The runner controls the 
>>>>> >> > proto,
>>>>> >> >     and should ensure it only sends protos it will be able to handle 
>>>>> >> > the
>>>>> >> >     SDK responding to. I'm not seeing why this is sub-optimal.
>>>>> >> >
>>>>> >> >      > (2) Add a new method WireCoders#instantiateRunnerWireKeyCoder 
>>>>> >> > which
>>>>> >> >      > returns the correct key coder, i.e. for standard coders, the 
>>>>> >> > concrete
>>>>> >> >      > coder, and for non-standard coders a ByteArrayCoder. We also 
>>>>> >> > need to
>>>>> >> >      > ensure the key encoding on the Runner side is OUTER context, 
>>>>> >> > to avoid
>>>>> >> >      > adding a length prefix to the encoded bytes. Basically, the
>>>>> >> >     non-standard
>>>>> >> >      > coders result in a NOOP coder which does not touch the key 
>>>>> >> > bytes.
>>>>> >> >
>>>>> >> >     I'd really like to avoid implicit agreements about how the coder 
>>>>> >> > that
>>>>> >> >     should be used differs from what's specified in the proto in 
>>>>> >> > different
>>>>> >> >     contexts.
>>>>> >> >
>>>>> >> >      > (3) Patch the Python SDK to ensure non-standard state key 
>>>>> >> > coders are
>>>>> >> >      > always wrapped in a LengthPrefixCoder. That way, we can keep 
>>>>> >> > the
>>>>> >> >      > existing logic on the Runner side.
>>>>> >> >
>>>>> >> >     The key concept here is not "standard coder" but "coder that the
>>>>> >> >     runner does not understand." This knowledge is only in the 
>>>>> >> > runner.
>>>>> >> >     Also has the downside of (2).
>>>>> >> >
>>>>> >> >      > Option (2) seems like the most practical.
>>>>> >> >      >
>>>>> >> >      > -Max
>>>>> >> >      >
>>>>> >> >      > On 06.11.19 17:26, Robert Bradshaw wrote:
>>>>> >> >      > > On Wed, Nov 6, 2019 at 2:55 AM Maximilian Michels
>>>>> >> >     <m...@apache.org <mailto:m...@apache.org>> wrote:
>>>>> >> >      > >>
>>>>> >> >      > >> Let me try to clarify:
>>>>> >> >      > >>
>>>>> >> >      > >>> The Coder used for State/Timers in a StatefulDoFn is 
>>>>> >> > pulled
>>>>> >> >     out of the
>>>>> >> >      > >>> input PCollection. If a Runner needs to partition by this
>>>>> >> >     coder, it
>>>>> >> >      > >>> should ensure the coder of this PCollection matches with 
>>>>> >> > the
>>>>> >> >     Coder
>>>>> >> >      > >>> used to create the serialized bytes that are used for
>>>>> >> >     partitioning
>>>>> >> >      > >>> (whether or not this is length-prefixed).
>>>>> >> >      > >>
>>>>> >> >      > >> That is essentially what I had assumed when I wrote the 
>>>>> >> > code. The
>>>>> >> >      > >> problem is the coder can be "pulled out" in different ways.
>>>>> >> >      > >>
>>>>> >> >      > >> For example, let's say we have the following Proto 
>>>>> >> > PCollection
>>>>> >> >     coder
>>>>> >> >      > >> with non-standard coder "CustomCoder" as the key coder:
>>>>> >> >      > >>
>>>>> >> >      > >>     KvCoder<CustomCoder, VarIntCoder>
>>>>> >> >      > >>
>>>>> >> >      > >>   From the Runner side, this currently looks like the 
>>>>> >> > following:
>>>>> >> >      > >>
>>>>> >> >      > >>     PCol: KvCoder<LengthPrefixCoder<ByteArrayCoder>, 
>>>>> >> > VarIntCoder>
>>>>> >> >      > >>     Key:  LengthPrefixCoder<ByteArrayCoder>
>>>>> >> >      > >
>>>>> >> >      > > This is I think where the error is. When If the proto 
>>>>> >> > references
>>>>> >> >      > > KvCoder<CustomCoder, VarIntCoder> it should not be pulled 
>>>>> >> > out as
>>>>> >> >      > > KvCoder<LengthPrefixCoder<ByteArrayCoder>, VarIntCoder>; as 
>>>>> >> > that
>>>>> >> >      > > doesn't have the same encoding. Trying to do instantiate 
>>>>> >> > such a
>>>>> >> >     coder
>>>>> >> >      > > should be an error. Instead, the runner knows ahead of time 
>>>>> >> > that it
>>>>> >> >      > > will need to instantiate this coder, and should update the 
>>>>> >> > bundle
>>>>> >> >      > > processor to specify KvCoder<LengthPrefixCoder<CustomCoder>,
>>>>> >> >      > > VarIntCoder> as the coder so both can pull it out in a
>>>>> >> >     consistent way.
>>>>> >> >      > >
>>>>> >> >      > > When the coder is KvCoder<LengthPrefixCoder<CustomCoder>,
>>>>> >> >     VarIntCoder>
>>>>> >> >      > > instantiating it as KvCoder<ByteArrayCoder, VarIntCoder> on 
>>>>> >> > the
>>>>> >> >     runner
>>>>> >> >      > > is of course OK as they do have the same encoding.
>>>>> >> >      > >
>>>>> >> >      > >> At the SDK Harness, we have the coder available:
>>>>> >> >      > >>
>>>>> >> >      > >>     PCol: KvCoder<CustomCoder, VarIntCoder>
>>>>> >> >      > >>     Key:  CustomCoder
>>>>> >> >      > >>
>>>>> >> >      > >> Currently, when the SDK Harness serializes a key for a 
>>>>> >> > state
>>>>> >> >     request,
>>>>> >> >      > >> the custom coder may happen to add a length prefix, or it 
>>>>> >> > may
>>>>> >> >     not. It
>>>>> >> >      > >> depends on the coder used. The correct behavior would be to
>>>>> >> >     use the same
>>>>> >> >      > >> representation as on the Runner side.
>>>>> >> >      > >>
>>>>> >> >      > >>> Specifically, "We have no way of telling from the Runner
>>>>> >> >     side, if a length prefix has been used or not." seems false
>>>>> >> >      > >>
>>>>> >> >      > >> The Runner cannot inspect an unknown coder, it only has the
>>>>> >> >     opaque Proto
>>>>> >> >      > >> information available which does not allow introspection of
>>>>> >> >     non-standard
>>>>> >> >      > >> coders. With the current state, the Runner may think the 
>>>>> >> > coder
>>>>> >> >     adds a
>>>>> >> >      > >> length prefix but the Python SDK worker could choose to add
>>>>> >> >     none. This
>>>>> >> >      > >> produces an inconsistent key encoding. See above.
>>>>> >> >      > >
>>>>> >> >      > > I think what's being conflated here is "the Coder has been
>>>>> >> >     wrapped in
>>>>> >> >      > > a LengthPrefixCoder" vs. "the coder does length prefixing."
>>>>> >> >     These are
>>>>> >> >      > > two orthogonal concepts. The runner in general only knows 
>>>>> >> > the
>>>>> >> >     former.
>>>>> >> >      > >
>>>>> >> >      > >> It looks like the key encoding for state requests on the
>>>>> >> >     Python SDK
>>>>> >> >      > >> Harness side is broken. For transferring elements of a
>>>>> >> >     PCollection, the
>>>>> >> >      > >> coders are obviously working correctly, but for encoding
>>>>> >> >     solely the key
>>>>> >> >      > >> of an element, there is a consistency issue.
>>>>> >> >      > >>
>>>>> >> >      > >>
>>>>> >> >      > >> -Max
>>>>> >> >      > >>
>>>>> >> >      > >> On 06.11.19 05:35, Kenneth Knowles wrote:
>>>>> >> >      > >>> Specifically, "We have no way of telling from the Runner
>>>>> >> >     side, if a
>>>>> >> >      > >>> length prefix has been used or not." seems false. The 
>>>>> >> > runner
>>>>> >> >     has all the
>>>>> >> >      > >>> information since length prefix is a model coder. Didn't 
>>>>> >> > we
>>>>> >> >     agree that
>>>>> >> >      > >>> all coders should be self-delimiting in runner/SDK 
>>>>> >> > interactions,
>>>>> >> >      > >>> requiring length-prefix only when there is an opaque or
>>>>> >> >     dynamic-length
>>>>> >> >      > >>> value? I assume you mean that at runtime the worker for a
>>>>> >> >     given engine
>>>>> >> >      > >>> does not know?
>>>>> >> >      > >>>
>>>>> >> >      > >>> Kenn
>>>>> >> >      > >>>
>>>>> >> >      > >>> On Tue, Nov 5, 2019 at 3:19 PM Luke Cwik <lc...@google.com
>>>>> >> >     <mailto:lc...@google.com>
>>>>> >> >      > >>> <mailto:lc...@google.com <mailto:lc...@google.com>>> 
>>>>> >> > wrote:
>>>>> >> >      > >>>
>>>>> >> >      > >>>      +1 to what Robert said.
>>>>> >> >      > >>>
>>>>> >> >      > >>>      On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw
>>>>> >> >     <rober...@google.com <mailto:rober...@google.com>
>>>>> >> >      > >>>      <mailto:rober...@google.com
>>>>> >> >     <mailto:rober...@google.com>>> wrote:
>>>>> >> >      > >>>
>>>>> >> >      > >>>          The Coder used for State/Timers in a 
>>>>> >> > StatefulDoFn is
>>>>> >> >     pulled out
>>>>> >> >      > >>>          of the
>>>>> >> >      > >>>          input PCollection. If a Runner needs to 
>>>>> >> > partition by
>>>>> >> >     this coder, it
>>>>> >> >      > >>>          should ensure the coder of this PCollection 
>>>>> >> > matches
>>>>> >> >     with the Coder
>>>>> >> >      > >>>          used to create the serialized bytes that are used
>>>>> >> >     for partitioning
>>>>> >> >      > >>>          (whether or not this is length-prefixed).
>>>>> >> >      > >>>
>>>>> >> >      > >>>          Concretely, the graph looks like
>>>>> >> >      > >>>
>>>>> >> >      > >>>
>>>>> >> >      > >>>          Runner                          SDK Harness
>>>>> >> >      > >>>
>>>>> >> >      > >>>          WriteToGbk
>>>>> >> >      > >>>               |
>>>>> >> >      > >>>          ReadFromGbk
>>>>> >> >      > >>>               |
>>>>> >> >      > >>>          RunnerMapFn.processKeyValue(key, value)
>>>>> >> >      > >>>               |
>>>>> >> >      > >>>               WriteToDataChannel
>>>>> >> >      > >>>                       ------------------------>
>>>>> >> >      > >>>                            ReadFromDataChannel
>>>>> >> >      > >>>                                          |
>>>>> >> >      > >>>                                      (pcIn)
>>>>> >> >      > >>>                                          |
>>>>> >> >      > >>>                             MyStatefulDoFn.process(key, 
>>>>> >> > value)
>>>>> >> >      > >>>
>>>>> >> >      > >>>          Now the (key part of the) Coder of pcIn, which 
>>>>> >> > comes
>>>>> >> >     from the proto
>>>>> >> >      > >>>          that the Runner sent to the SDK, must match the 
>>>>> >> > (key
>>>>> >> >     part of the)
>>>>> >> >      > >>>          encoding used in WriteToGbk and ReadFromGbk. If a
>>>>> >> >     LenthPrefix is
>>>>> >> >      > >>>          added
>>>>> >> >      > >>>          in one spot, it must be added in the other.
>>>>> >> >      > >>>
>>>>> >> >      > >>>
>>>>> >> >      > >>>          [1]
>>>>> >> >      > >>>
>>>>> >> >     
>>>>> >> > https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183
>>>>> >> >      > >>>
>>>>> >> >      > >>>          On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels
>>>>> >> >      > >>>          <m...@apache.org <mailto:m...@apache.org>
>>>>> >> >     <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > Hi,
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > I wanted to get your opinion on something 
>>>>> >> > that I
>>>>> >> >     have been
>>>>> >> >      > >>>          struggling
>>>>> >> >      > >>>           > with. It is about the coders for state 
>>>>> >> > requests
>>>>> >> >     in portable
>>>>> >> >      > >>>          pipelines.
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > In contrast to "classic" Beam, the Runner is 
>>>>> >> > not
>>>>> >> >     guaranteed
>>>>> >> >      > >>>          to know
>>>>> >> >      > >>>           > which coder is used by the SDK. If the SDK
>>>>> >> >     happens to use a
>>>>> >> >      > >>>          standard
>>>>> >> >      > >>>           > coder (also known as model coder), we will 
>>>>> >> > also
>>>>> >> >     have it
>>>>> >> >      > >>>          available at the
>>>>> >> >      > >>>           > Runner, i.e. if the Runner is written in one 
>>>>> >> > of
>>>>> >> >     the SDK
>>>>> >> >      > >>>          languages (e.g.
>>>>> >> >      > >>>           > Java). However, when we do not have a standard
>>>>> >> >     coder, we just
>>>>> >> >      > >>>          treat the
>>>>> >> >      > >>>           > data from the SDK as a blob and just pass it
>>>>> >> >     around as bytes.
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > Problem
>>>>> >> >      > >>>           > =======
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > In the case of state requests which the SDK
>>>>> >> >     Harness authors
>>>>> >> >      > >>>          to the
>>>>> >> >      > >>>           > Runner, we would like for the key associated 
>>>>> >> > with
>>>>> >> >     the state
>>>>> >> >      > >>>          request to
>>>>> >> >      > >>>           > match the key of the element which led to
>>>>> >> >     initiating the
>>>>> >> >      > >>>          state request.
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > Example:
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > Runner                 SDK Harness
>>>>> >> >      > >>>           > ------                 -----------
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > KV["key","value"]  --> Process Element
>>>>> >> >      > >>>           >                                |
>>>>> >> >      > >>>           > LookupState("key") <-- Request state of "key"
>>>>> >> >      > >>>           >          |
>>>>> >> >      > >>>           >     State["key"]    --> Receive state
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > For stateful DoFns, the Runner partitions the
>>>>> >> >     data based on
>>>>> >> >      > >>>          the key. In
>>>>> >> >      > >>>           > Flink, this partitioning must not change 
>>>>> >> > during
>>>>> >> >     the lifetime of a
>>>>> >> >      > >>>           > pipeline because the checkpointing otherwise
>>>>> >> >     breaks[0]. The
>>>>> >> >      > >>>          key is
>>>>> >> >      > >>>           > extracted from the element and stored encoded.
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > If we have a standard coder, it is basically 
>>>>> >> > the
>>>>> >> >     same as in the
>>>>> >> >      > >>>           > "classic" Runner which takes the key and
>>>>> >> >     serializes it.
>>>>> >> >      > >>>          However, when we
>>>>> >> >      > >>>           > have an SDK-specific coder, we basically do 
>>>>> >> > not
>>>>> >> >     know how it
>>>>> >> >      > >>>          encodes. So
>>>>> >> >      > >>>           > far, we have been using the coder instantiated
>>>>> >> >     from the
>>>>> >> >      > >>>          Proto, which is
>>>>> >> >      > >>>           > basically a LengthPrefixCoder[ByteArrayCoder] 
>>>>> >> > or
>>>>> >> >     similar[1].
>>>>> >> >      > >>>          We have had
>>>>> >> >      > >>>           > problems with this because the key encoding of
>>>>> >> >     Java SDK state
>>>>> >> >      > >>>          requests
>>>>> >> >      > >>>           > did not match the key encoding on the Runner 
>>>>> >> > side
>>>>> >> >     [2]. In an
>>>>> >> >      > >>>          attempt to
>>>>> >> >      > >>>           > fix those, it is now partly broken for 
>>>>> >> > portable
>>>>> >> >     Python pipelines.
>>>>> >> >      > >>>           > Partly, because it "only" affects non-standard
>>>>> >> >     coders.
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > Non-standard coders yield the aforementioned
>>>>> >> >      > >>>           > LengthPrefixCoder[ByteArrayCoder]. Now, 
>>>>> >> > following
>>>>> >> >     the usual
>>>>> >> >      > >>>          encoding
>>>>> >> >      > >>>           > scheme, we would simply encode the key using 
>>>>> >> > this
>>>>> >> >     coder.
>>>>> >> >      > >>>          However, for
>>>>> >> >      > >>>           > state requests, the Python SDK leaves out the
>>>>> >> >     length prefix
>>>>> >> >      > >>>          for certain
>>>>> >> >      > >>>           > coders, e.g. for primitives like int or byte. 
>>>>> >> > It
>>>>> >> >     is possible
>>>>> >> >      > >>>          that one
>>>>> >> >      > >>>           > coder uses a length prefix, while another
>>>>> >> >     doesn't. We have no
>>>>> >> >      > >>>          way of
>>>>> >> >      > >>>           > telling from the Runner side, if a length 
>>>>> >> > prefix
>>>>> >> >     has been
>>>>> >> >      > >>>          used or not.
>>>>> >> >      > >>>           > This results in the keys to not match on the
>>>>> >> >     Runner side and the
>>>>> >> >      > >>>           > partitioning to be broken.
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > How to solve this?
>>>>> >> >      > >>>           > ==================
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > (1) Should this simply be fixed on the Python 
>>>>> >> > SDK
>>>>> >> >     side? One
>>>>> >> >      > >>>          fix would be
>>>>> >> >      > >>>           > to always append a length prefix to the key 
>>>>> >> > in state
>>>>> >> >      > >>>          requests, even for
>>>>> >> >      > >>>           > primitive coders like VarInt which do not use 
>>>>> >> > one.
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > OR
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > (2) Should the Runner detect that a 
>>>>> >> > non-standard
>>>>> >> >     coder is
>>>>> >> >      > >>>          used? If so,
>>>>> >> >      > >>>           > it should just pass the bytes from the SDK
>>>>> >> >     Harness and never
>>>>> >> >      > >>>          make an
>>>>> >> >      > >>>           > attempt to construct a coder based on the 
>>>>> >> > Proto.
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > Thinking about it now, it seems pretty obvious
>>>>> >> >     that (2) is
>>>>> >> >      > >>>          the most
>>>>> >> >      > >>>           > feasible way to avoid complications across all
>>>>> >> >     current and
>>>>> >> >      > >>>          future SDKs
>>>>> >> >      > >>>           > for key encodings. Still, it is odd that the
>>>>> >> >     Proto contains coder
>>>>> >> >      > >>>           > information which is not usable.
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > What do you think?
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > Thanks,
>>>>> >> >      > >>>           > Max
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>           > [0] It is possible to restart the pipeline and
>>>>> >> >     repartition the
>>>>> >> >      > >>>           > checkpointed data.
>>>>> >> >      > >>>           > [1]
>>>>> >> >      > >>>           >
>>>>> >> >      > >>>
>>>>> >> >     
>>>>> >> > https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682
>>>>> >> >      > >>>           > [2] 
>>>>> >> > https://issues.apache.org/jira/browse/BEAM-8157
>>>>> >> >      > >>>
>>>>> >> >

Reply via email to