Hi,

picking this once again, as it seems to me, that the problem is even more complex than simply consulting _one_ (producer) SDK for capabilities. Because of cross-language, it is possible, that the SDK that produces the data can have a different set of standard coders, than the SDK that will consume the data. Moreover, it is even possible, that there will be multiple consumers of the same PCollection written in different SDKs, which would mean, that we need to take the intersection of standard coders of producer and all consumers to be able to deduce the final wire coder. Is this correct? Do we have a tracking Jira for fixing the Java and Python runner-core?

 Jan

On 9/9/21 7:29 PM, Robert Bradshaw wrote:
You are right, Java does not implement this correctly. It should be querying the capabilities section of the environment proto. (For java environments, this is populated from ModelCoders, e.g. https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java#L386 <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java#L386> )

Looks like Python doesn't do any better: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L459 <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L459>



On Thu, Sep 9, 2021 at 10:13 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    This makes a *lot* of sense. But it seems to me, that this is not
    the way Java-based runners - that use
    runners-core-construction-java module - handle it. If I interpret
    it correctly, then the set of ModelCoders is hard-coded [1] and
    essentially required to be known by all SDKs [2].

    There seems to be no negotiation between what SDK harness knows
    and what the runner knows. The runner might be able to define the
    wire coder for the SDK (via the ProcessBundleDescriptor), but the
    SDK (for Java runners) seems not to be able to play any role in
    this [3]. Therefore I think that if an SDK does not know the set
    of Java ModelCoders, then the runner and the SDK might not agree
    on the binary encoding of the elements (BTW, which is why Java
    Coders cannot be part of ModelCoders and I finally understand why
    I had such troubles adding it there - it cannot be there).

    Is it possible we are missing some part of this runner-to-sdk
    coder negotiation in runners-core-contruction-java?

    [1]
    
https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java#L75
    
<https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java#L75>

    [2]
    
https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62
    
<https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62>

    [3]
    
https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L104
    
<https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L104>

    On 9/9/21 12:18 AM, Robert Bradshaw wrote:
    The whole notion of an absolute set of known coders is a
    misnomer. It would require all Runners and SDKs to be updated
    synchronously for every new coder they might want to share.

    Instead, what we have are

    * Standard Coders which have well-defined, language-agnostic
    representations and encodings, which may be used for
    interoperability and efficiency, and
    * Required Coders which are the minimum needed to execute the
    pipeline.

    The latter consists only of bytes (for impulse), kv and iterable
    (for GBK), windowed value (for windowing information) and
    length prefix (to be able to handle anything else).


    On Wed, Sep 8, 2021 at 3:03 PM Robert Burke <[email protected]
    <mailto:[email protected]>> wrote:

        Is the claim that the Standard bytes and String_utf8 coders
        are not "known coders"?

         What's the point of the standard coders if they are not the
        canonical "known coders" that can generally be expected to be
        known by runners/other SDKs?

        
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L790
        
<https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L790>

        The Go SDK rather heavily biases towards using the standard
        coders for their closes language equivalents rather than
        going into override/custom specified soup. (It's not possible
        to globally override the coders for the '[]byte' and 'string'
        types, nor is there often reason to.)

        On Wed, Sep 8, 2021, 2:56 PM Robert Bradshaw
        <[email protected] <mailto:[email protected]>> wrote:

            On Wed, Sep 8, 2021 at 1:48 PM Jack McCluskey
            <[email protected] <mailto:[email protected]>>
            wrote:

                Hey all,

                Just catching up on the thread since I did the
                TestStream Go SDK implementation. The discussion
                about length prefixing behavior for known vs. unknown
                coders is interesting, since we ran into strings and
                byte slices getting extra length prefixes attached to
                them by Flink despite being known coders.


            Known to who?

                Based on what's been said, that isn't expected
                behavior, right?


            No, it's not.

            I would check to make sure the Go SDK is respecting the
            Coder (length prefixed or not) that's set on the channel,
            rather than guessing at what it expects it to be based on
            the Go type.

                On Tue, Sep 7, 2021 at 2:46 PM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:

                    On 9/7/21 6:02 PM, Reuven Lax wrote:
                    Historically the DataflowRunner has been much
                    more careful about not breaking update, since
                    this is a frequent operation by Dataflow users.
                    I think we've been less careful aboutt other
                    runners, but as we see clearly here Fllnk users
                    do care about this as well, so we should
                    probably test upgrade compatibility for Flink.

                    One strategy that Dataflow uses is to avoid
                    embedding the Java serialized form of a Coder in
                    the graph, as this is a much higher risk of
                    breakage (as we see with the issue you llnked
                    to). Possibly similar strategies should be
                    investigated for Fllink.
                    +1, that would be great!

                    Reuven

                    On Mon, Sep 6, 2021 at 1:29 AM Jan Lukavský
                    <[email protected] <mailto:[email protected]>> wrote:

                        > Unfortunately the most basic coders (e.g.
                        bytes, string, kv, iterable)
                        > care about Context because they predated
                        this deprecation, and
                        > changing coders is hard (due to no way to
                        update the encoding for a
                        > streaming pipeline).
                        This is unrelated, but - regarding changing
                        coders due to concerns about
                        pipeline upgrades, we break this quite
                        often, at least for some runners.
                        Most recently [1].

                        > It is currently the latter for runners
                        using this code (which not all
                        > do, e.g. the ULR and Dataflow runners). I
                        don't think we want to
                        > ossify this decision as part of the spec.
                        (Note that even what's
                        > "known" and "unknown" can change from
                        runner to runner.)
                        This is interesting and unexpected for me.
                        How do runners decide about
                        how they encode elements between SDK harness
                        and the runner? How do they
                        inform the SDK harness about this decision?
                        My impression was that this
                        is well-defined at the model level. If not,
                        then we have the reason for
                        misunderstanding in this conversation. :-)

                          Jan

                        [1]
                        
https://lists.apache.org/thread.html/r51ee0bbaba2dcef13524a189c1f579f209483418a1568acff0e2c789%40%3Cdev.beam.apache.org%3E
                        
<https://lists.apache.org/thread.html/r51ee0bbaba2dcef13524a189c1f579f209483418a1568acff0e2c789%40%3Cdev.beam.apache.org%3E>

                        On 9/4/21 7:32 PM, Robert Bradshaw wrote:
                        > On Sat, Sep 4, 2021 at 6:52 AM Jan
                        Lukavský <[email protected]
                        <mailto:[email protected]>> wrote:
                        >> On 9/3/21 9:50 PM, Robert Bradshaw wrote:
                        >>
                        >>> On Fri, Sep 3, 2021 at 11:42 AM Jan
                        Lukavský<[email protected]
                        <mailto:[email protected]>> wrote:
                        >>>> Hi Robert,
                        >>>>
                        >>>>> There's another hitch here for
                        TestStream. For historical reasons,
                        >>>>> coders actually represent two
                        encodings: nested (aka self delimiting)
                        >>>>> and unnested. TestStream elements are
                        given as unnested encoded bytes,
                        >>>>> but the nested encoding is required
                        for sending data to the SDK. The
                        >>>>> runner can't go from <nested encoding>
                        to <unnested encoding> for an
                        >>>>> arbitrary unknown coder.
                        >>>>>
                        >>>>> (Even if it weren't for this
                        complication, to be able to send already
                        >>>>> encoded bytes of an unknown coder to
                        the SDK will also complicate the
                        >>>>> logic in choosing the coder to be used
                        for the channel and sending the
                        >>>>> data, which is some of what you're
                        running into (but can be solved
                        >>>>> differently for inlined reads as the
                        coder can always be known by the
                        >>>>> runner).)
                        >>>> It is hard for me to argue with
                        "historical reasons". But - the "nested"
                        >>>> and "unnested" coders look very similar
                        to SDK-coder and runner-coder
                        >>>> spaces.
                        >>> Unfortunately, they're actually
                        orthogonal to that.
                        >> Hm, do you mean the Context passed to the
                        encode/decode method? [1] That
                        >> seems to be deprecated, I assume that
                        most coders use the default
                        >> implementation and simply ignore the Context?
                        > Unfortunately the most basic coders (e.g.
                        bytes, string, kv, iterable)
                        > care about Context because they predated
                        this deprecation, and
                        > changing coders is hard (due to no way to
                        update the encoding for a
                        > streaming pipeline).
                        >
                        >> Even if not - whether or
                        >> not the elements are encoded using NESTED
                        Context or UNNESTED Context
                        >> should be part of the contract of
                        TestStream, right? Most likely it is
                        >> the UNNESTED one, if I understand
                        correctly what that does. Under what
                        >> conditions is the deprecated
                        encode/decode method used?
                        > Yes, it's the UNNESTED one.
                        >
                        >> [1]
                        >>
                        
https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L134
                        
<https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L134>
                        >>
                        >>>> The runner's responsibility is not to
                        go from "<nested
                        >>>> encoding>" (SDK coder) to "<unnested
                        encoding>" for arbitrary coder.
                        >>>> That is really impossible. But a coder
                        is a function, right? Function
                        >>>> maps from universe A to universe B (in
                        general). TestStream provides a
                        >>>> set of elements, and these elements are
                        the "universe". For those
                        >>>> elements it also provides the encoded
                        form, which can be interpreted as
                        >>>> the definition of the coder.
                        >>> The problem here is that there is not
                        "the encoded form" for a Coder
                        >>> but two encoded forms, and we have the
                        wrong one. Things could be made
                        >>> to work if we had the other.
                        >> Which two encoded forms do you refer to?
                        Elements encoded by both the
                        >> SDK-coder and runner-coder (and I ignore
                        the Context here once again)
                        >> have the same binary representation
                        (which they must have, otherwise it
                        >> would be impossible to decode elements
                        coming from the runner to the
                        >> SDK-harness or vice-versa).
                        >>>> Therefore - technically (and formally) -
                        >>>> the SDK coder for the TestStream is
                        known to the runner, regardless of
                        >>>> the language the runner is written in.
                        >>>>
                        >>>> To move this discussion forward, I
                        think we should look for answers to
                        >>>> the following questions:
                        >>>>
                        >>>>     a) do we have any clues that show,
                        that the proposed "in runner"
                        >>>> solution will not work?
                        >>> OK, thinking about it some more, in the
                        TestStream, we can use the
                        >>> happy coincidence that
                        >>>
                        >>>  LengthPrefixed(C).encode(x, nested=True) ==
                        >>> VarLong.encode(len(C.encode(x,
                        nested=False))) || C.encode(x,
                        >>> nested=False)
                        >>>
                        >>> (where || denotes concatenation) and the
                        fact that we have
                        >>>
                        >>>  C.encode(x, nested=False)
                        >>>
                        >>> in hand.
                        >>>
                        >>> A possible fix here for the OP's
                        question is that when rehydrating the
                        >>> TestStream transform it must behave
                        differently according to the coder
                        >>> used in the subsequent channel (e.g. for
                        known coders, it decodes the
                        >>> elements and emits them directly, but
                        for unknown coders, it prefixes
                        >>> them with their length and emits byte
                        strings. It gets more
                        >>> complicated for nested coders, e.g. for
                        a KV<known-coder,
                        >>> unknown-coder> the channel might be
                        LP(KV<known-coder, unknown-coder))
                        >>> or KV<known-coder, LP(unknown-coder))
                        which have different encodings
                        >>> (and the latter, which is the default,
                        requires transcoding the bytes
                        >>> to inject the length in the middle which
                        is found by decoding the
                        >>> first component). As well as getting
                        more complex, this really seems
                        >>> to violate the spirit of separation of
                        concerns.
                        >> How do we make the decision if the
                        channel is LP<KV<..>> or
                        >> KV<LP<unknown>, known>? From my
                        understanding it is always the latter,
                        >> because of [2].
                        >>
                        >> [2]
                        >>
                        
https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48
                        
<https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48>
                        > It is currently the latter for runners
                        using this code (which not all
                        > do, e.g. the ULR and Dataflow runners). I
                        don't think we want to
                        > ossify this decision as part of the spec.
                        (Note that even what's
                        > "known" and "unknown" can change from
                        runner to runner.)
                        >
                        >>>>     b) do we think, that it will not be
                        robust enough to incorporate the
                        >>>> other use-cases (line generic transform
                        inlining, taking into account
                        >>>> that this applies only to runners that
                        are written in the same language
                        >>>> as the submitting SDK, because
                        otherwise, there is nothing to inline)?
                        >>> Being in the same language is not a
                        prerequisite to "inlining," e.g.
                        >>> the PubSub source on Dataflow is
                        recognized as such and not executed
                        >>> as SDK code but natively.
                        >> Agree, that is actually exactly what
                        happens with the TestStream. The
                        >> transform need not be in the same
                        language, as long as it is completely
                        >> understood by the runner, including the
                        SDK-coder (either explicitly -
                        >> which might be due to the PCollection
                        coder being composed of well-known
                        >> coders only, or implicitly like in the
                        case of TestStream, where the
                        >> elements are encoded using the SDK coder.
                        >>> It is more likely that inlining occurs
                        in the same language if there
                        >>> are UDFs involved.
                        >>>
                        >>>> I'm convinced, that the
                        TestStream-decode expansion solution is an
                        >>>> ad-hoc solution to a generic problem,
                        which is why I'm still bothering
                        >>>> this mailing list with my emails on
                        this. :-)
                        >>>>
                        >>>> WDYT?
                        >>> While not a solution to the general
                        problem, I think the
                        >>> TestStream-only-does-bytes simplifies
                        its definition (primitives
                        >>> should have as simple/easy to implement
                        definitions as possible) and
                        >>> brings it closer to the other root we
                        have: Impulse. (We could go a
                        >>> step further and rather than emitting
                        encoded elements, with the data
                        >>> in the proto itself, it emits sequence
                        numbers, and a subsequent ParDo
                        >>> maps those to concrete elements (e.g.
                        via an in-memory map), but that
                        >>> further step doesn't buy much...)
                        >>>
                        >>> Only runners that want to do inlining
                        would have to take on the
                        >>> complexity of a fully generic solution.
                        >> I think that if the simplification brings
                        something, we can do that, but
                        >> I'd like to understand why we cannot (or
                        should not) use the generic
                        >> solution. I think it definitely *should*
                        be possible to use a generic
                        >> solution, because otherwise the solution
                        would not be generic. And it
                        >> would imply, that we are unable to do
                        generic transform inlining, which
                        >> I would find really strange. That would
                        immediately mean, that we are
                        >> unable to construct classical runner as a
                        special case of the portable
                        >> one, which would be bad I think.
                        >>
                        >> The elements in the TestStreamPayload are
                        encoded with pure SDK-coder,
                        >> or does this go through the
                        LengthPrefixUnknownCoders logic? If not,
                        >> then the problem would be there, because
                        that means, that the SDK-coder
                        >> cannot be (implicitly) defined in the
                        runner. If the elements would be
                        >> encoded using LP, then it would be
                        possible to decode them using
                        >> runner-coder and the problem should be
                        solved, or am I still missing
                        >> some key parts?
                        > Yes, the problem is precisely that there
                        are (unspecified) constraints
                        > on the coder used by the
                        TestStreamPayload. Just requiring that it be
                        > length prefixed is not enough, you have to
                        make constraints on
                        > sometimes pushing down the length
                        prefixing if it's a composite (like
                        > a KV) that depend on what the runner is
                        expected to support in terms
                        > of composites and/or the choices it
                        chooses for the channel (and the
                        > runner, not knowing the coder, can't
                        transcode between these choices).
                        >
                        > The simpler solution is to constrain this
                        coder to just be byte[]
                        > rather than let it be a little bit
                        flexible (but not wholly flexible).
                        >
                        > As for a fully generic solution, I think
                        the issue encountered with
                        > inlining Read vs. TestStream are related
                        to this, but not really the
                        > same. With TestStream one has an encoded
                        representation of the
                        > elements provided by the SDK that the
                        Runner and has no SDK
                        > representation/execution whereas with the
                        Reads one has unencoded
                        > elements in hand and a Coder that is
                        understood by both (so long as
                        > the channel can be negotiated correctly).
                        FWIW, I think the proper
                        > solution to inlining a Read (or other
                        Transform that would typically
                        > be executed in the SDK) is to treat it as
                        a special environment (where
                        > we know logically it can work) and then
                        elide, as possible, the
                        > various encodings, grpc calls, etc. that
                        are unneeded as everything is
                        > in process.
                        >
                        >>>> On 9/3/21 7:03 PM, Robert Bradshaw wrote:
                        >>>>> On Fri, Sep 3, 2021 at 2:40 AM Jan
                        Lukavský<[email protected]
                        <mailto:[email protected]>> wrote:
                        >>>>>> On 9/3/21 1:06 AM, Robert Bradshaw wrote:
                        >>>>>>> On Thu, Sep 2, 2021 at 1:03 AM Jan
                        Lukavský<[email protected]
                        <mailto:[email protected]>> wrote:
                        >>>>>>>> Hi,
                        >>>>>>>>
                        >>>>>>>> I had some more time thinking about
                        this and I'll try to recap that.
                        >>>>>>>> First some invariants:
                        >>>>>>>>
                        >>>>>>>>       a) each PCollection<T> has
                        actually two coders - an _SDK coder_ and a
                        >>>>>>>> _runner coder_. These coders have
                        the property, that each one can
                        >>>>>>>> _decode_ what the other encoded,
                        but the opposite is not true, the
                        >>>>>>>> coders cannot _encode_ what the
                        other _decoded_ (in general).
                        >>>>>>>>
                        >>>>>>>>       b) when is a PCollection<T>
                        computed inside an environment, the
                        >>>>>>>> elements are encoded using SDK
                        coder on the side of SDK-harness and
                        >>>>>>>> decoded using runner coder after
                        receiving in the runner
                        >>>>>>>>
                        >>>>>>>>       c) under specific
                        circumstances, the encode-decode step can be
                        >>>>>>>> optimized out, that is the case
                        where the SDK coder and all its
                        >>>>>>>> subcoders are all well-known to the
                        runner (in the present, that means
                        >>>>>>>> that all the parts present in the
                        model coders set). The reason for that
                        >>>>>>>> is that in this specific situation
                        runner_decode(sdk_encode(X)) = X.
                        >>>>>>>> This property is essential.
                        >>>>>>> However, in general, X can only pass
                        from the SDK to the runner (or
                        >>>>>>> vice versa) in encoded form.
                        >>>>>> In general yes, but we are (mostly)
                        talking transform inlining here, so
                        >>>>>> it that particular situation, the
                        elements might be passed in decoded form.
                        >>>>>>>>       d) from b) immediately
                        follows, that when a PTransform does not run in
                        >>>>>>>> an environment (and this might be
                        due to the transform being runner
                        >>>>>>>> native, inlined, source (e.g.
                        Impulse or TestStream)) the elements have
                        >>>>>>>> to be encoded by SDK coder,
                        immediately following decode by runner
                        >>>>>>>> coder. That (surprisingly) applies
                        even to situations when runner is
                        >>>>>>>> implemented using different
                        language than the client SDK, because it
                        >>>>>>>> implies that the type of produced
                        elements must be one of types encoded
                        >>>>>>>> using model coders (well-known to
                        the runner, otherwise the SDK will not
                        >>>>>>>> be able to consume it). But - due
                        to property c) - this means that this
                        >>>>>>>> encode-decode step can be optimized
                        out. This does not mean that it is
                        >>>>>>>> not (logically) present, though.
                        This is exactly the case of native
                        >>>>>>>> Impulse transform.
                        >>>>>>>>
                        >>>>>>>> Now, from that we can conclude that
                        on the boundary between executable
                        >>>>>>>> stages, or between runner (inlined)
                        transform and executable stage, each
                        >>>>>>>> PCollection has to be encoded using
                        SDK coder and immediately decoded by
                        >>>>>>>> runner coder, *unless this can be
                        optimized out* by property c).
                        >>>>>>>>
                        >>>>>>>> This gives us two options where to
                        implement this encode/decode step:
                        >>>>>>>>
                        >>>>>>>>       1) completely inside runner
                        with the possibility to optimize the
                        >>>>>>>> encode/decode step by identity
                        under right circumstances
                        >>>>>>>>
                        >>>>>>>>       2) partly in the runner and
                        partly in the SDK - that is we encode
                        >>>>>>>> elements of PCollection using SDK
                        coder into bytes, pass those to the
                        >>>>>>>> SDK harness and apply a custom
                        decode step there. This works because SDK
                        >>>>>>>> coder encoded elements are in
                        byte[], and that is well-known coder type.
                        >>>>>>>> We again only leverage property c)
                        and optimize the SDK coder encode,
                        >>>>>>>> runner decode step out.
                        >>>>>>>>
                        >>>>>>>> The option 2) is exactly the
                        proposal of TestStream producing byte[] and
                        >>>>>>>> decoding inside SDK-harness, the
                        TestStream is actually inlined
                        >>>>>>>> transform, the elements are
                        produced directly in runner (the SDK coder
                        >>>>>>>> is not known to the runner, but
                        that does not matter, because the
                        >>>>>>>> elements are already encoded by
                        client).
                        >>>>>>>>
                        >>>>>>>>      From the above it seems to me,
                        that option 1) should be preferred, because:
                        >>>>>>>>
                        >>>>>>>>       i) it is generic, applicable
                        to all inlined transforms, any sources
                        >>>>>>>>
                        >>>>>>>>       ii) it is consistent with how
                        things logically work underneath
                        >>>>>>>>
                        >>>>>>>>       iii) it offers better room
                        for optimization - option 2) might result
                        >>>>>>>> in cases when the elements are
                        passed from the runner to the SDK-harness
                        >>>>>>>> only for the sake of the decoding
                        from SDK coder and immediately
                        >>>>>>>> encoding back using SDK-coder and
                        returned back to the runner. This
                        >>>>>>>> would be the case when TestStream
                        would be directly consumed by inlined
                        >>>>>>>> (or external) transform.
                        >>>>>>> (1) is not possible if the Coder in
                        question is not known to the
                        >>>>>>> Runner, which is why I proposed (2).
                        >>>>>> There is no particular need for the
                        coder to be known. If transform is
                        >>>>>> to be inlined, what *has* to be known
                        is the SDK-encoded form of the
                        >>>>>> elements. That holds true if:
                        >>>>>>
                        >>>>>>   a) either the SDK coder is known, or
                        >>>>>>
                        >>>>>>   b) encoded form of the produced
                        elements is known in advance
                        >>>>>>
                        >>>>>> For TestStream it is the case b). For
                        inlined primitive Read (or any
                        >>>>>> other transform which executes code)
                        it is a).
                        >>>>> There's another hitch here for
                        TestStream. For historical reasons,
                        >>>>> coders actually represent two
                        encodings: nested (aka self delimiting)
                        >>>>> and unnested. TestStream elements are
                        given as unnested encoded bytes,
                        >>>>> but the nested encoding is required
                        for sending data to the SDK. The
                        >>>>> runner can't go from <nested encoding>
                        to <unnested encoding> for an
                        >>>>> arbitrary unknown coder.
                        >>>>>
                        >>>>> (Even if it weren't for this
                        complication, to be able to send already
                        >>>>> encoded bytes of an unknown coder to
                        the SDK will also complicate the
                        >>>>> logic in choosing the coder to be used
                        for the channel and sending the
                        >>>>> data, which is some of what you're
                        running into (but can be solved
                        >>>>> differently for inlined reads as the
                        coder can always be known by the
                        >>>>> runner).)



--
                        

                Jack McCluskey
                SWE - DataPLS PLAT/ Beam Go
                RDU
                [email protected] <mailto:[email protected]>


Reply via email to