This makes a *lot* of sense. But it seems to me, that this is not the way Java-based runners - that use runners-core-construction-java module - handle it. If I interpret it correctly, then the set of ModelCoders is hard-coded [1] and essentially required to be known by all SDKs [2].

There seems to be no negotiation between what SDK harness knows and what the runner knows. The runner might be able to define the wire coder for the SDK (via the ProcessBundleDescriptor), but the SDK (for Java runners) seems not to be able to play any role in this [3]. Therefore I think that if an SDK does not know the set of Java ModelCoders, then the runner and the SDK might not agree on the binary encoding of the elements (BTW, which is why Java Coders cannot be part of ModelCoders and I finally understand why I had such troubles adding it there - it cannot be there).

Is it possible we are missing some part of this runner-to-sdk coder negotiation in runners-core-contruction-java?

[1] https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java#L75

[2] https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62

[3] https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L104

On 9/9/21 12:18 AM, Robert Bradshaw wrote:
The whole notion of an absolute set of known coders is a misnomer. It would require all Runners and SDKs to be updated synchronously for every new coder they might want to share.

Instead, what we have are

* Standard Coders which have well-defined, language-agnostic representations and encodings, which may be used for interoperability and efficiency, and
* Required Coders which are the minimum needed to execute the pipeline.

The latter consists only of bytes (for impulse), kv and iterable (for GBK), windowed value (for windowing information) and length prefix (to be able to handle anything else).


On Wed, Sep 8, 2021 at 3:03 PM Robert Burke <[email protected] <mailto:[email protected]>> wrote:

    Is the claim that the Standard bytes and String_utf8 coders are
    not "known coders"?

     What's the point of the standard coders if they are not the
    canonical "known coders" that can generally be expected to be
    known by runners/other SDKs?

    
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L790
    
<https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L790>

    The Go SDK rather heavily biases towards using the standard coders
    for their closes language equivalents rather than going into
    override/custom specified soup. (It's not possible to globally
    override the coders for the '[]byte' and 'string' types, nor is
    there often reason to.)

    On Wed, Sep 8, 2021, 2:56 PM Robert Bradshaw <[email protected]
    <mailto:[email protected]>> wrote:

        On Wed, Sep 8, 2021 at 1:48 PM Jack McCluskey
        <[email protected] <mailto:[email protected]>> wrote:

            Hey all,

            Just catching up on the thread since I did the TestStream
            Go SDK implementation. The discussion about length
            prefixing behavior for known vs. unknown coders is
            interesting, since we ran into strings and byte slices
            getting extra length prefixes attached to them by Flink
            despite being known coders.


        Known to who?

            Based on what's been said, that isn't expected behavior,
            right?


        No, it's not.

        I would check to make sure the Go SDK is respecting the Coder
        (length prefixed or not) that's set on the channel, rather
        than guessing at what it expects it to be based on the Go type.

            On Tue, Sep 7, 2021 at 2:46 PM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                On 9/7/21 6:02 PM, Reuven Lax wrote:
                Historically the DataflowRunner has been much more
                careful about not breaking update, since this is a
                frequent operation by Dataflow users. I think we've
                been less careful aboutt other runners, but as we see
                clearly here Fllnk users do care about this as well,
                so we should probably test upgrade compatibility for
                Flink.

                One strategy that Dataflow uses is to avoid embedding
                the Java serialized form of a Coder in the graph, as
                this is a much higher risk of breakage (as we see
                with the issue you llnked to). Possibly similar
                strategies should be investigated for Fllink.
                +1, that would be great!

                Reuven

                On Mon, Sep 6, 2021 at 1:29 AM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:

                    > Unfortunately the most basic coders (e.g.
                    bytes, string, kv, iterable)
                    > care about Context because they predated this
                    deprecation, and
                    > changing coders is hard (due to no way to
                    update the encoding for a
                    > streaming pipeline).
                    This is unrelated, but - regarding changing
                    coders due to concerns about
                    pipeline upgrades, we break this quite often, at
                    least for some runners.
                    Most recently [1].

                    > It is currently the latter for runners using
                    this code (which not all
                    > do, e.g. the ULR and Dataflow runners). I don't
                    think we want to
                    > ossify this decision as part of the spec. (Note
                    that even what's
                    > "known" and "unknown" can change from runner to
                    runner.)
                    This is interesting and unexpected for me. How do
                    runners decide about
                    how they encode elements between SDK harness and
                    the runner? How do they
                    inform the SDK harness about this decision? My
                    impression was that this
                    is well-defined at the model level. If not, then
                    we have the reason for
                    misunderstanding in this conversation. :-)

                      Jan

                    [1]
                    
https://lists.apache.org/thread.html/r51ee0bbaba2dcef13524a189c1f579f209483418a1568acff0e2c789%40%3Cdev.beam.apache.org%3E
                    
<https://lists.apache.org/thread.html/r51ee0bbaba2dcef13524a189c1f579f209483418a1568acff0e2c789%40%3Cdev.beam.apache.org%3E>

                    On 9/4/21 7:32 PM, Robert Bradshaw wrote:
                    > On Sat, Sep 4, 2021 at 6:52 AM Jan Lukavský
                    <[email protected] <mailto:[email protected]>> wrote:
                    >> On 9/3/21 9:50 PM, Robert Bradshaw wrote:
                    >>
                    >>> On Fri, Sep 3, 2021 at 11:42 AM Jan
                    Lukavský<[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>> Hi Robert,
                    >>>>
                    >>>>> There's another hitch here for TestStream.
                    For historical reasons,
                    >>>>> coders actually represent two encodings:
                    nested (aka self delimiting)
                    >>>>> and unnested. TestStream elements are given
                    as unnested encoded bytes,
                    >>>>> but the nested encoding is required for
                    sending data to the SDK. The
                    >>>>> runner can't go from <nested encoding> to
                    <unnested encoding> for an
                    >>>>> arbitrary unknown coder.
                    >>>>>
                    >>>>> (Even if it weren't for this complication,
                    to be able to send already
                    >>>>> encoded bytes of an unknown coder to the
                    SDK will also complicate the
                    >>>>> logic in choosing the coder to be used for
                    the channel and sending the
                    >>>>> data, which is some of what you're running
                    into (but can be solved
                    >>>>> differently for inlined reads as the coder
                    can always be known by the
                    >>>>> runner).)
                    >>>> It is hard for me to argue with "historical
                    reasons". But - the "nested"
                    >>>> and "unnested" coders look very similar to
                    SDK-coder and runner-coder
                    >>>> spaces.
                    >>> Unfortunately, they're actually orthogonal to
                    that.
                    >> Hm, do you mean the Context passed to the
                    encode/decode method? [1] That
                    >> seems to be deprecated, I assume that most
                    coders use the default
                    >> implementation and simply ignore the Context?
                    > Unfortunately the most basic coders (e.g.
                    bytes, string, kv, iterable)
                    > care about Context because they predated this
                    deprecation, and
                    > changing coders is hard (due to no way to
                    update the encoding for a
                    > streaming pipeline).
                    >
                    >> Even if not - whether or
                    >> not the elements are encoded using NESTED
                    Context or UNNESTED Context
                    >> should be part of the contract of TestStream,
                    right? Most likely it is
                    >> the UNNESTED one, if I understand correctly
                    what that does. Under what
                    >> conditions is the deprecated encode/decode
                    method used?
                    > Yes, it's the UNNESTED one.
                    >
                    >> [1]
                    >>
                    
https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L134
                    
<https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L134>
                    >>
                    >>>> The runner's responsibility is not to go
                    from "<nested
                    >>>> encoding>" (SDK coder) to "<unnested
                    encoding>" for arbitrary coder.
                    >>>> That is really impossible. But a coder is a
                    function, right? Function
                    >>>> maps from universe A to universe B (in
                    general). TestStream provides a
                    >>>> set of elements, and these elements are the
                    "universe". For those
                    >>>> elements it also provides the encoded form,
                    which can be interpreted as
                    >>>> the definition of the coder.
                    >>> The problem here is that there is not "the
                    encoded form" for a Coder
                    >>> but two encoded forms, and we have the wrong
                    one. Things could be made
                    >>> to work if we had the other.
                    >> Which two encoded forms do you refer to?
                    Elements encoded by both the
                    >> SDK-coder and runner-coder (and I ignore the
                    Context here once again)
                    >> have the same binary representation (which
                    they must have, otherwise it
                    >> would be impossible to decode elements coming
                    from the runner to the
                    >> SDK-harness or vice-versa).
                    >>>> Therefore - technically (and formally) -
                    >>>> the SDK coder for the TestStream is known to
                    the runner, regardless of
                    >>>> the language the runner is written in.
                    >>>>
                    >>>> To move  this discussion forward, I think we
                    should look for answers to
                    >>>> the following questions:
                    >>>>
                    >>>>     a) do we have any clues that show, that
                    the proposed "in runner"
                    >>>> solution will not work?
                    >>> OK, thinking about it some more, in the
                    TestStream, we can use the
                    >>> happy coincidence that
                    >>>
                    >>>  LengthPrefixed(C).encode(x, nested=True) ==
                    >>> VarLong.encode(len(C.encode(x,
                    nested=False))) || C.encode(x,
                    >>> nested=False)
                    >>>
                    >>> (where || denotes concatenation) and the fact
                    that we have
                    >>>
                    >>>       C.encode(x, nested=False)
                    >>>
                    >>> in hand.
                    >>>
                    >>> A possible fix here for the OP's question is
                    that when rehydrating the
                    >>> TestStream transform it must behave
                    differently according to the coder
                    >>> used in the subsequent channel (e.g. for
                    known coders, it decodes the
                    >>> elements and emits them directly, but for
                    unknown coders, it prefixes
                    >>> them with their length and emits byte
                    strings. It gets more
                    >>> complicated for nested coders, e.g. for a
                    KV<known-coder,
                    >>> unknown-coder> the channel might be
                    LP(KV<known-coder, unknown-coder))
                    >>> or KV<known-coder, LP(unknown-coder)) which
                    have different encodings
                    >>> (and the latter, which is the default,
                    requires transcoding the bytes
                    >>> to inject the length in the middle which is
                    found by decoding the
                    >>> first component). As well as getting more
                    complex, this really seems
                    >>> to violate the spirit of separation of concerns.
                    >> How do we make the decision if the channel is
                    LP<KV<..>> or
                    >> KV<LP<unknown>, known>? From my understanding
                    it is always the latter,
                    >> because of [2].
                    >>
                    >> [2]
                    >>
                    
https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48
                    
<https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48>
                    > It is currently the latter for runners using
                    this code (which not all
                    > do, e.g. the ULR and Dataflow runners). I don't
                    think we want to
                    > ossify this decision as part of the spec. (Note
                    that even what's
                    > "known" and "unknown" can change from runner to
                    runner.)
                    >
                    >>>>     b) do we think, that it will not be
                    robust enough to incorporate the
                    >>>> other use-cases (line generic transform
                    inlining, taking into account
                    >>>> that this applies only to runners that are
                    written in the same language
                    >>>> as the submitting SDK, because otherwise,
                    there is nothing to inline)?
                    >>> Being in the same language is not a
                    prerequisite to "inlining," e.g.
                    >>> the PubSub source on Dataflow is recognized
                    as such and not executed
                    >>> as SDK code but natively.
                    >> Agree, that is actually exactly what happens
                    with the TestStream. The
                    >> transform need not be in the same language, as
                    long as it is completely
                    >> understood by the runner, including the
                    SDK-coder (either explicitly -
                    >> which might be due to the PCollection coder
                    being composed of well-known
                    >> coders only, or implicitly like in the case of
                    TestStream, where the
                    >> elements are encoded using the SDK coder.
                    >>> It is more likely that inlining occurs in the
                    same language if there
                    >>> are UDFs involved.
                    >>>
                    >>>> I'm convinced, that the TestStream-decode
                    expansion solution is an
                    >>>> ad-hoc solution to a generic problem, which
                    is why I'm still bothering
                    >>>> this mailing list with my emails on this. :-)
                    >>>>
                    >>>> WDYT?
                    >>> While not a solution to the general problem,
                    I think the
                    >>> TestStream-only-does-bytes simplifies its
                    definition (primitives
                    >>> should have as simple/easy to implement
                    definitions as possible) and
                    >>> brings it closer to the other root we have:
                    Impulse. (We could go a
                    >>> step further and rather than emitting encoded
                    elements, with the data
                    >>> in the proto itself, it emits sequence
                    numbers, and a subsequent ParDo
                    >>> maps those to concrete elements (e.g. via an
                    in-memory map), but that
                    >>> further step doesn't buy much...)
                    >>>
                    >>> Only runners that want to do inlining would
                    have to take on the
                    >>> complexity of a fully generic solution.
                    >> I think that if the simplification brings
                    something, we can do that, but
                    >> I'd like to understand why we cannot (or
                    should not) use the generic
                    >> solution. I think it definitely *should* be
                    possible to use a generic
                    >> solution, because otherwise the solution would
                    not be generic. And it
                    >> would imply, that we are unable to do generic
                    transform inlining, which
                    >> I would find really strange. That would
                    immediately mean, that we are
                    >> unable to construct classical runner as a
                    special case of the portable
                    >> one, which would be bad I think.
                    >>
                    >> The elements in the TestStreamPayload are
                    encoded with pure SDK-coder,
                    >> or does this go through the
                    LengthPrefixUnknownCoders logic? If not,
                    >> then the problem would be there, because that
                    means, that the SDK-coder
                    >> cannot be (implicitly) defined in the runner.
                    If the elements would be
                    >> encoded using LP, then it would be possible to
                    decode them using
                    >> runner-coder and the problem should be solved,
                    or am I still missing
                    >> some key parts?
                    > Yes, the problem is precisely that there are
                    (unspecified) constraints
                    > on the coder used by the TestStreamPayload.
                    Just requiring that it be
                    > length prefixed is not enough, you have to make
                    constraints on
                    > sometimes pushing down the length prefixing if
                    it's a composite (like
                    > a KV) that depend on what the runner is
                    expected to support in terms
                    > of composites and/or the choices it chooses for
                    the channel (and the
                    > runner, not knowing the coder, can't transcode
                    between these choices).
                    >
                    > The simpler solution is to constrain this coder
                    to just be byte[]
                    > rather than let it be a little bit flexible
                    (but not wholly flexible).
                    >
                    > As for a fully generic solution, I think the
                    issue encountered with
                    > inlining Read vs. TestStream are related to
                    this, but not really the
                    > same. With TestStream one has an encoded
                    representation of the
                    > elements provided by the SDK that the Runner
                    and has no SDK
                    > representation/execution whereas with the Reads
                    one has unencoded
                    > elements in hand and a Coder that is understood
                    by both (so long as
                    > the channel can be negotiated correctly). FWIW,
                    I think the proper
                    > solution to inlining a Read (or other Transform
                    that would typically
                    > be executed in the SDK) is to treat it as a
                    special environment (where
                    > we know logically it can work) and then elide,
                    as possible, the
                    > various encodings, grpc calls, etc. that are
                    unneeded as everything is
                    > in process.
                    >
                    >>>> On 9/3/21 7:03 PM, Robert Bradshaw wrote:
                    >>>>> On Fri, Sep 3, 2021 at 2:40 AM Jan
                    Lukavský<[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>>> On 9/3/21 1:06 AM, Robert Bradshaw wrote:
                    >>>>>>> On Thu, Sep 2, 2021 at 1:03 AM Jan
                    Lukavský<[email protected]
                    <mailto:[email protected]>> wrote:
                    >>>>>>>> Hi,
                    >>>>>>>>
                    >>>>>>>> I had some more time thinking about this
                    and I'll try to recap that.
                    >>>>>>>> First some invariants:
                    >>>>>>>>
                    >>>>>>>>    a) each PCollection<T> has actually
                    two coders - an _SDK coder_ and a
                    >>>>>>>> _runner coder_. These coders have the
                    property, that each one can
                    >>>>>>>> _decode_ what the other encoded, but the
                    opposite is not true, the
                    >>>>>>>> coders cannot _encode_ what the other
                    _decoded_ (in general).
                    >>>>>>>>
                    >>>>>>>>    b) when is a PCollection<T> computed
                    inside an environment, the
                    >>>>>>>> elements are encoded using SDK coder on
                    the side of SDK-harness and
                    >>>>>>>> decoded using runner coder after
                    receiving in the runner
                    >>>>>>>>
                    >>>>>>>>    c) under specific circumstances, the
                    encode-decode step can be
                    >>>>>>>> optimized out, that is the case where
                    the SDK coder and all its
                    >>>>>>>> subcoders are all well-known to the
                    runner (in the present, that means
                    >>>>>>>> that all the parts present in the model
                    coders set). The reason for that
                    >>>>>>>> is that in this specific situation
                    runner_decode(sdk_encode(X)) = X.
                    >>>>>>>> This property is essential.
                    >>>>>>> However, in general, X can only pass from
                    the SDK to the runner (or
                    >>>>>>> vice versa) in encoded form.
                    >>>>>> In general yes, but we are (mostly)
                    talking transform inlining here, so
                    >>>>>> it that particular situation, the elements
                    might be passed in decoded form.
                    >>>>>>>>    d) from b) immediately follows, that
                    when a PTransform does not run in
                    >>>>>>>> an environment (and this might be due to
                    the transform being runner
                    >>>>>>>> native, inlined, source (e.g. Impulse or
                    TestStream)) the elements have
                    >>>>>>>> to be encoded by SDK coder, immediately
                    following decode by runner
                    >>>>>>>> coder. That (surprisingly) applies even
                    to situations when runner is
                    >>>>>>>> implemented using different language
                    than the client SDK, because it
                    >>>>>>>> implies that the type of produced
                    elements must be one of types encoded
                    >>>>>>>> using model coders (well-known to the
                    runner, otherwise the SDK will not
                    >>>>>>>> be able to consume it). But - due to
                    property c) - this means that this
                    >>>>>>>> encode-decode step can be optimized out.
                    This does not mean that it is
                    >>>>>>>> not (logically) present, though. This is
                    exactly the case of native
                    >>>>>>>> Impulse transform.
                    >>>>>>>>
                    >>>>>>>> Now, from that we can conclude that on
                    the boundary between executable
                    >>>>>>>> stages, or between runner (inlined)
                    transform and executable stage, each
                    >>>>>>>> PCollection has to be encoded using SDK
                    coder and immediately decoded by
                    >>>>>>>> runner coder, *unless this can be
                    optimized out* by property c).
                    >>>>>>>>
                    >>>>>>>> This gives us two options where to
                    implement this encode/decode step:
                    >>>>>>>>
                    >>>>>>>>    1) completely inside runner with the
                    possibility to optimize the
                    >>>>>>>> encode/decode step by identity under
                    right circumstances
                    >>>>>>>>
                    >>>>>>>>    2) partly in the runner and partly in
                    the SDK - that is we encode
                    >>>>>>>> elements of PCollection using SDK coder
                    into bytes, pass those to the
                    >>>>>>>> SDK harness and apply a custom decode
                    step there. This works because SDK
                    >>>>>>>> coder encoded elements are in byte[],
                    and that is well-known coder type.
                    >>>>>>>> We again only leverage property c) and
                    optimize the SDK coder encode,
                    >>>>>>>> runner decode step out.
                    >>>>>>>>
                    >>>>>>>> The option 2) is exactly the proposal of
                    TestStream producing byte[] and
                    >>>>>>>> decoding inside SDK-harness, the
                    TestStream is actually inlined
                    >>>>>>>> transform, the elements are produced
                    directly in runner (the SDK coder
                    >>>>>>>> is not known to the runner, but that
                    does not matter, because the
                    >>>>>>>> elements are already encoded by client).
                    >>>>>>>>
                    >>>>>>>>   From the above it seems to me, that
                    option 1) should be preferred, because:
                    >>>>>>>>
                    >>>>>>>>    i) it is generic, applicable to all
                    inlined transforms, any sources
                    >>>>>>>>
                    >>>>>>>>    ii) it is consistent with how things
                    logically work underneath
                    >>>>>>>>
                    >>>>>>>>    iii) it offers better room for
                    optimization - option 2) might result
                    >>>>>>>> in cases when the elements are passed
                    from the runner to the SDK-harness
                    >>>>>>>> only for the sake of the decoding from
                    SDK coder and immediately
                    >>>>>>>> encoding back using SDK-coder and
                    returned back to the runner. This
                    >>>>>>>> would be the case when TestStream would
                    be directly consumed by inlined
                    >>>>>>>> (or external) transform.
                    >>>>>>> (1) is not possible if the Coder in
                    question is not known to the
                    >>>>>>> Runner, which is why I proposed (2).
                    >>>>>> There is no particular need for the coder
                    to be known. If transform is
                    >>>>>> to be inlined, what *has* to be known is
                    the SDK-encoded form of the
                    >>>>>> elements. That holds true if:
                    >>>>>>
                    >>>>>>      a) either the SDK coder is known, or
                    >>>>>>
                    >>>>>>      b) encoded form of the produced
                    elements is known in advance
                    >>>>>>
                    >>>>>> For TestStream it is the case b). For
                    inlined primitive Read (or any
                    >>>>>> other transform which executes code) it is a).
                    >>>>> There's another hitch here for TestStream.
                    For historical reasons,
                    >>>>> coders actually represent two encodings:
                    nested (aka self delimiting)
                    >>>>> and unnested. TestStream elements are given
                    as unnested encoded bytes,
                    >>>>> but the nested encoding is required for
                    sending data to the SDK. The
                    >>>>> runner can't go from <nested encoding> to
                    <unnested encoding> for an
                    >>>>> arbitrary unknown coder.
                    >>>>>
                    >>>>> (Even if it weren't for this complication,
                    to be able to send already
                    >>>>> encoded bytes of an unknown coder to the
                    SDK will also complicate the
                    >>>>> logic in choosing the coder to be used for
                    the channel and sending the
                    >>>>> data, which is some of what you're running
                    into (but can be solved
                    >>>>> differently for inlined reads as the coder
                    can always be known by the
                    >>>>> runner).)



--
                

            Jack McCluskey
            SWE - DataPLS PLAT/ Beam Go
            RDU
            [email protected] <mailto:[email protected]>


Reply via email to