Go also doesn't do this, instead maintaining a list of known standard
coders and using that as a list of capabilities:
https://github.com/apache/beam/blob/4bc870806099f03265e7dfb48b142f00cee42f47/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L59

On Thu, Sep 9, 2021 at 1:29 PM Robert Bradshaw <[email protected]> wrote:

> You are right, Java does not implement this correctly. It should be
> querying the capabilities section of the environment proto. (For java
> environments, this is populated from ModelCoders, e.g.
> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java#L386
> )
>
> Looks like Python doesn't do any better:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L459
>
>
>
> On Thu, Sep 9, 2021 at 10:13 AM Jan Lukavský <[email protected]> wrote:
>
>> This makes a *lot* of sense. But it seems to me, that this is not the way
>> Java-based runners - that use runners-core-construction-java module -
>> handle it. If I interpret it correctly, then the set of ModelCoders is
>> hard-coded [1] and essentially required to be known by all SDKs [2].
>>
>> There seems to be no negotiation between what SDK harness knows and what
>> the runner knows. The runner might be able to define the wire coder for the
>> SDK (via the ProcessBundleDescriptor), but the SDK (for Java runners) seems
>> not to be able to play any role in this [3]. Therefore I think that if an
>> SDK does not know the set of Java ModelCoders, then the runner and the SDK
>> might not agree on the binary encoding of the elements (BTW, which is why
>> Java Coders cannot be part of ModelCoders and I finally understand why I
>> had such troubles adding it there - it cannot be there).
>>
>> Is it possible we are missing some part of this runner-to-sdk coder
>> negotiation in runners-core-contruction-java?
>>
>> [1]
>> https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java#L75
>>
>> [2]
>> https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62
>>
>> [3]
>> https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L104
>> On 9/9/21 12:18 AM, Robert Bradshaw wrote:
>>
>> The whole notion of an absolute set of known coders is a misnomer. It
>> would require all Runners and SDKs to be updated synchronously for every
>> new coder they might want to share.
>>
>> Instead, what we have are
>>
>> * Standard Coders which have well-defined, language-agnostic
>> representations and encodings, which may be used for interoperability and
>> efficiency, and
>> * Required Coders which are the minimum needed to execute the pipeline.
>>
>> The latter consists only of bytes (for impulse), kv and iterable (for
>> GBK), windowed value (for windowing information) and length prefix (to be
>> able to handle anything else).
>>
>>
>> On Wed, Sep 8, 2021 at 3:03 PM Robert Burke <[email protected]> wrote:
>>
>>> Is the claim that the Standard bytes and String_utf8 coders are not
>>> "known coders"?
>>>
>>>  What's the point of the standard coders if they are not the canonical
>>> "known coders" that can generally be expected to be known by runners/other
>>> SDKs?
>>>
>>>
>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L790
>>>
>>> The Go SDK rather heavily biases towards using the standard coders for
>>> their closes language equivalents rather than going into override/custom
>>> specified soup. (It's not possible to globally override the coders for the
>>> '[]byte' and 'string' types, nor is there often reason to.)
>>>
>>> On Wed, Sep 8, 2021, 2:56 PM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> On Wed, Sep 8, 2021 at 1:48 PM Jack McCluskey <[email protected]>
>>>> wrote:
>>>>
>>>>> Hey all,
>>>>>
>>>>> Just catching up on the thread since I did the TestStream Go SDK
>>>>> implementation. The discussion about length prefixing behavior for known
>>>>> vs. unknown coders is interesting, since we ran into strings and byte
>>>>> slices getting extra length prefixes attached to them by Flink despite
>>>>> being known coders.
>>>>>
>>>>
>>>> Known to who?
>>>>
>>>>
>>>>> Based on what's been said, that isn't expected behavior, right?
>>>>>
>>>>
>>>> No, it's not.
>>>>
>>>> I would check to make sure the Go SDK is respecting the Coder (length
>>>> prefixed or not) that's set on the channel, rather than guessing at what it
>>>> expects it to be based on the Go type.
>>>>
>>>>
>>>>> On Tue, Sep 7, 2021 at 2:46 PM Jan Lukavský <[email protected]> wrote:
>>>>>
>>>>>> On 9/7/21 6:02 PM, Reuven Lax wrote:
>>>>>>
>>>>>> Historically the DataflowRunner has been much more careful about not
>>>>>> breaking update, since this is a frequent operation by Dataflow users. I
>>>>>> think we've been less careful aboutt other runners, but as we see clearly
>>>>>> here Fllnk users do care about this as well, so we should probably test
>>>>>> upgrade compatibility for Flink.
>>>>>>
>>>>>> One strategy that Dataflow uses is to avoid embedding the Java
>>>>>> serialized form of a Coder in the graph, as this is a much higher risk of
>>>>>> breakage (as we see with the issue you llnked to). Possibly similar
>>>>>> strategies should be investigated for Fllink.
>>>>>>
>>>>>> +1, that would be great!
>>>>>>
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Mon, Sep 6, 2021 at 1:29 AM Jan Lukavský <[email protected]> wrote:
>>>>>>
>>>>>>> > Unfortunately the most basic coders (e.g. bytes, string, kv,
>>>>>>> iterable)
>>>>>>> > care about Context because they predated this deprecation, and
>>>>>>> > changing coders is hard (due to no way to update the encoding for a
>>>>>>> > streaming pipeline).
>>>>>>> This is unrelated, but - regarding changing coders due to concerns
>>>>>>> about
>>>>>>> pipeline upgrades, we break this quite often, at least for some
>>>>>>> runners.
>>>>>>> Most recently [1].
>>>>>>>
>>>>>>> > It is currently the latter for runners using this code (which not
>>>>>>> all
>>>>>>> > do, e.g. the ULR and Dataflow runners). I don't think we want to
>>>>>>> > ossify this decision as part of the spec. (Note that even what's
>>>>>>> > "known" and "unknown" can change from runner to runner.)
>>>>>>> This is interesting and unexpected for me. How do runners decide
>>>>>>> about
>>>>>>> how they encode elements between SDK harness and the runner? How do
>>>>>>> they
>>>>>>> inform the SDK harness about this decision? My impression was that
>>>>>>> this
>>>>>>> is well-defined at the model level. If not, then we have the reason
>>>>>>> for
>>>>>>> misunderstanding in this conversation. :-)
>>>>>>>
>>>>>>>   Jan
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>>> https://lists.apache.org/thread.html/r51ee0bbaba2dcef13524a189c1f579f209483418a1568acff0e2c789%40%3Cdev.beam.apache.org%3E
>>>>>>>
>>>>>>> On 9/4/21 7:32 PM, Robert Bradshaw wrote:
>>>>>>> > On Sat, Sep 4, 2021 at 6:52 AM Jan Lukavský <[email protected]>
>>>>>>> wrote:
>>>>>>> >> On 9/3/21 9:50 PM, Robert Bradshaw wrote:
>>>>>>> >>
>>>>>>> >>> On Fri, Sep 3, 2021 at 11:42 AM Jan Lukavský<[email protected]>
>>>>>>> wrote:
>>>>>>> >>>> Hi Robert,
>>>>>>> >>>>
>>>>>>> >>>>> There's another hitch here for TestStream. For historical
>>>>>>> reasons,
>>>>>>> >>>>> coders actually represent two encodings: nested (aka self
>>>>>>> delimiting)
>>>>>>> >>>>> and unnested. TestStream elements are given as unnested
>>>>>>> encoded bytes,
>>>>>>> >>>>> but the nested encoding is required for sending data to the
>>>>>>> SDK. The
>>>>>>> >>>>> runner can't go from <nested encoding> to <unnested encoding>
>>>>>>> for an
>>>>>>> >>>>> arbitrary unknown coder.
>>>>>>> >>>>>
>>>>>>> >>>>> (Even if it weren't for this complication, to be able to send
>>>>>>> already
>>>>>>> >>>>> encoded bytes of an unknown coder to the SDK will also
>>>>>>> complicate the
>>>>>>> >>>>> logic in choosing the coder to be used for the channel and
>>>>>>> sending the
>>>>>>> >>>>> data, which is some of what you're running into (but can be
>>>>>>> solved
>>>>>>> >>>>> differently for inlined reads as the coder can always be known
>>>>>>> by the
>>>>>>> >>>>> runner).)
>>>>>>> >>>> It is hard for me to argue with "historical reasons". But - the
>>>>>>> "nested"
>>>>>>> >>>> and "unnested" coders look very similar to SDK-coder and
>>>>>>> runner-coder
>>>>>>> >>>> spaces.
>>>>>>> >>> Unfortunately, they're actually orthogonal to that.
>>>>>>> >> Hm, do you mean the Context passed to the encode/decode method?
>>>>>>> [1] That
>>>>>>> >> seems to be deprecated, I assume that most coders use the default
>>>>>>> >> implementation and simply ignore the Context?
>>>>>>> > Unfortunately the most basic coders (e.g. bytes, string, kv,
>>>>>>> iterable)
>>>>>>> > care about Context because they predated this deprecation, and
>>>>>>> > changing coders is hard (due to no way to update the encoding for a
>>>>>>> > streaming pipeline).
>>>>>>> >
>>>>>>> >> Even if not - whether or
>>>>>>> >> not the elements are encoded using NESTED Context or UNNESTED
>>>>>>> Context
>>>>>>> >> should be part of the contract of TestStream, right? Most likely
>>>>>>> it is
>>>>>>> >> the UNNESTED one, if I understand correctly what that does. Under
>>>>>>> what
>>>>>>> >> conditions is the deprecated encode/decode method used?
>>>>>>> > Yes, it's the UNNESTED one.
>>>>>>> >
>>>>>>> >> [1]
>>>>>>> >>
>>>>>>> https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L134
>>>>>>> >>
>>>>>>> >>>> The runner's responsibility is not to go from "<nested
>>>>>>> >>>> encoding>" (SDK coder) to "<unnested encoding>" for arbitrary
>>>>>>> coder.
>>>>>>> >>>> That is really impossible. But a coder is a function, right?
>>>>>>> Function
>>>>>>> >>>> maps from universe A to universe B (in general). TestStream
>>>>>>> provides a
>>>>>>> >>>> set of elements, and these elements are the "universe". For
>>>>>>> those
>>>>>>> >>>> elements it also provides the encoded form, which can be
>>>>>>> interpreted as
>>>>>>> >>>> the definition of the coder.
>>>>>>> >>> The problem here is that there is not "the encoded form" for a
>>>>>>> Coder
>>>>>>> >>> but two encoded forms, and we have the wrong one. Things could
>>>>>>> be made
>>>>>>> >>> to work if we had the other.
>>>>>>> >> Which two encoded forms do you refer to? Elements encoded by both
>>>>>>> the
>>>>>>> >> SDK-coder and runner-coder (and I ignore the Context here once
>>>>>>> again)
>>>>>>> >> have the same binary representation (which they must have,
>>>>>>> otherwise it
>>>>>>> >> would be impossible to decode elements coming from the runner to
>>>>>>> the
>>>>>>> >> SDK-harness or vice-versa).
>>>>>>> >>>> Therefore - technically (and formally) -
>>>>>>> >>>> the SDK coder for the TestStream is known to the runner,
>>>>>>> regardless of
>>>>>>> >>>> the language the runner is written in.
>>>>>>> >>>>
>>>>>>> >>>> To move  this discussion forward, I think we should look for
>>>>>>> answers to
>>>>>>> >>>> the following questions:
>>>>>>> >>>>
>>>>>>> >>>>     a) do we have any clues that show, that the proposed "in
>>>>>>> runner"
>>>>>>> >>>> solution will not work?
>>>>>>> >>> OK, thinking about it some more, in the TestStream, we can use
>>>>>>> the
>>>>>>> >>> happy coincidence that
>>>>>>> >>>
>>>>>>> >>>       LengthPrefixed(C).encode(x, nested=True) ==
>>>>>>> >>> VarLong.encode(len(C.encode(x, nested=False))) || C.encode(x,
>>>>>>> >>> nested=False)
>>>>>>> >>>
>>>>>>> >>> (where || denotes concatenation) and the fact that we have
>>>>>>> >>>
>>>>>>> >>>       C.encode(x, nested=False)
>>>>>>> >>>
>>>>>>> >>> in hand.
>>>>>>> >>>
>>>>>>> >>> A possible fix here for the OP's question is that when
>>>>>>> rehydrating the
>>>>>>> >>> TestStream transform it must behave differently according to the
>>>>>>> coder
>>>>>>> >>> used in the subsequent channel (e.g. for known coders, it
>>>>>>> decodes the
>>>>>>> >>> elements and emits them directly, but for unknown coders, it
>>>>>>> prefixes
>>>>>>> >>> them with their length and emits byte strings. It gets more
>>>>>>> >>> complicated for nested coders, e.g. for a KV<known-coder,
>>>>>>> >>> unknown-coder> the channel might be LP(KV<known-coder,
>>>>>>> unknown-coder))
>>>>>>> >>> or KV<known-coder, LP(unknown-coder)) which have different
>>>>>>> encodings
>>>>>>> >>> (and the latter, which is the default, requires transcoding the
>>>>>>> bytes
>>>>>>> >>> to inject the length in the middle which is found by decoding the
>>>>>>> >>> first component). As well as getting more complex, this really
>>>>>>> seems
>>>>>>> >>> to violate the spirit of separation of concerns.
>>>>>>> >> How do we make the decision if the channel is LP<KV<..>> or
>>>>>>> >> KV<LP<unknown>, known>? From my understanding it is always the
>>>>>>> latter,
>>>>>>> >> because of [2].
>>>>>>> >>
>>>>>>> >> [2]
>>>>>>> >>
>>>>>>> https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48
>>>>>>> > It is currently the latter for runners using this code (which not
>>>>>>> all
>>>>>>> > do, e.g. the ULR and Dataflow runners). I don't think we want to
>>>>>>> > ossify this decision as part of the spec. (Note that even what's
>>>>>>> > "known" and "unknown" can change from runner to runner.)
>>>>>>> >
>>>>>>> >>>>     b) do we think, that it will not be robust enough to
>>>>>>> incorporate the
>>>>>>> >>>> other use-cases (line generic transform inlining, taking into
>>>>>>> account
>>>>>>> >>>> that this applies only to runners that are written in the same
>>>>>>> language
>>>>>>> >>>> as the submitting SDK, because otherwise, there is nothing to
>>>>>>> inline)?
>>>>>>> >>> Being in the same language is not a prerequisite to "inlining,"
>>>>>>> e.g.
>>>>>>> >>> the PubSub source on Dataflow is recognized as such and not
>>>>>>> executed
>>>>>>> >>> as SDK code but natively.
>>>>>>> >> Agree, that is actually exactly what happens with the TestStream.
>>>>>>> The
>>>>>>> >> transform need not be in the same language, as long as it is
>>>>>>> completely
>>>>>>> >> understood by the runner, including the SDK-coder (either
>>>>>>> explicitly -
>>>>>>> >> which might be due to the PCollection coder being composed of
>>>>>>> well-known
>>>>>>> >> coders only, or implicitly like in the case of TestStream, where
>>>>>>> the
>>>>>>> >> elements are encoded using the SDK coder.
>>>>>>> >>> It is more likely that inlining occurs in the same language if
>>>>>>> there
>>>>>>> >>> are UDFs involved.
>>>>>>> >>>
>>>>>>> >>>> I'm convinced, that the TestStream-decode expansion solution is
>>>>>>> an
>>>>>>> >>>> ad-hoc solution to a generic problem, which is why I'm still
>>>>>>> bothering
>>>>>>> >>>> this mailing list with my emails on this. :-)
>>>>>>> >>>>
>>>>>>> >>>> WDYT?
>>>>>>> >>> While not a solution to the general problem, I think the
>>>>>>> >>> TestStream-only-does-bytes simplifies its definition (primitives
>>>>>>> >>> should have as simple/easy to implement definitions as possible)
>>>>>>> and
>>>>>>> >>> brings it closer to the other root we have: Impulse. (We could
>>>>>>> go a
>>>>>>> >>> step further and rather than emitting encoded elements, with the
>>>>>>> data
>>>>>>> >>> in the proto itself, it emits sequence numbers, and a subsequent
>>>>>>> ParDo
>>>>>>> >>> maps those to concrete elements (e.g. via an in-memory map), but
>>>>>>> that
>>>>>>> >>> further step doesn't buy much...)
>>>>>>> >>>
>>>>>>> >>> Only runners that want to do inlining would have to take on the
>>>>>>> >>> complexity of a fully generic solution.
>>>>>>> >> I think that if the simplification brings something, we can do
>>>>>>> that, but
>>>>>>> >> I'd like to understand why we cannot (or should not) use the
>>>>>>> generic
>>>>>>> >> solution. I think it definitely *should* be possible to use a
>>>>>>> generic
>>>>>>> >> solution, because otherwise the solution would not be generic.
>>>>>>> And it
>>>>>>> >> would imply, that we are unable to do generic transform inlining,
>>>>>>> which
>>>>>>> >> I would find really strange. That would immediately mean, that we
>>>>>>> are
>>>>>>> >> unable to construct classical runner as a special case of the
>>>>>>> portable
>>>>>>> >> one, which would be bad I think.
>>>>>>> >>
>>>>>>> >> The elements in the TestStreamPayload are encoded with pure
>>>>>>> SDK-coder,
>>>>>>> >> or does this go through the LengthPrefixUnknownCoders logic? If
>>>>>>> not,
>>>>>>> >> then the problem would be there, because that means, that the
>>>>>>> SDK-coder
>>>>>>> >> cannot be (implicitly) defined in the runner. If the elements
>>>>>>> would be
>>>>>>> >> encoded using LP, then it would be possible to decode them using
>>>>>>> >> runner-coder and the problem should be solved, or am I still
>>>>>>> missing
>>>>>>> >> some key parts?
>>>>>>> > Yes, the problem is precisely that there are (unspecified)
>>>>>>> constraints
>>>>>>> > on the coder used by the TestStreamPayload. Just requiring that it
>>>>>>> be
>>>>>>> > length prefixed is not enough, you have to make constraints on
>>>>>>> > sometimes pushing down the length prefixing if it's a composite
>>>>>>> (like
>>>>>>> > a KV) that depend on what the runner is expected to support in
>>>>>>> terms
>>>>>>> > of composites and/or the choices it chooses for the channel (and
>>>>>>> the
>>>>>>> > runner, not knowing the coder, can't transcode between these
>>>>>>> choices).
>>>>>>> >
>>>>>>> > The simpler solution is to constrain this coder to just be byte[]
>>>>>>> > rather than let it be a little bit flexible (but not wholly
>>>>>>> flexible).
>>>>>>> >
>>>>>>> > As for a fully generic solution, I think the issue encountered with
>>>>>>> > inlining Read vs. TestStream are related to this, but not really
>>>>>>> the
>>>>>>> > same. With TestStream one has an encoded representation of the
>>>>>>> > elements provided by the SDK that the Runner and has no SDK
>>>>>>> > representation/execution whereas with the Reads one has unencoded
>>>>>>> > elements in hand and a Coder that is understood by both (so long as
>>>>>>> > the channel can be negotiated correctly). FWIW, I think the proper
>>>>>>> > solution to inlining a Read (or other Transform that would
>>>>>>> typically
>>>>>>> > be executed in the SDK) is to treat it as a special environment
>>>>>>> (where
>>>>>>> > we know logically it can work) and then elide, as possible, the
>>>>>>> > various encodings, grpc calls, etc. that are unneeded as
>>>>>>> everything is
>>>>>>> > in process.
>>>>>>> >
>>>>>>> >>>> On 9/3/21 7:03 PM, Robert Bradshaw wrote:
>>>>>>> >>>>> On Fri, Sep 3, 2021 at 2:40 AM Jan Lukavský<[email protected]>
>>>>>>> wrote:
>>>>>>> >>>>>> On 9/3/21 1:06 AM, Robert Bradshaw wrote:
>>>>>>> >>>>>>> On Thu, Sep 2, 2021 at 1:03 AM Jan Lukavský<[email protected]>
>>>>>>> wrote:
>>>>>>> >>>>>>>> Hi,
>>>>>>> >>>>>>>>
>>>>>>> >>>>>>>> I had some more time thinking about this and I'll try to
>>>>>>> recap that.
>>>>>>> >>>>>>>> First some invariants:
>>>>>>> >>>>>>>>
>>>>>>> >>>>>>>>       a) each PCollection<T> has actually two coders - an
>>>>>>> _SDK coder_ and a
>>>>>>> >>>>>>>> _runner coder_. These coders have the property, that each
>>>>>>> one can
>>>>>>> >>>>>>>> _decode_ what the other encoded, but the opposite is not
>>>>>>> true, the
>>>>>>> >>>>>>>> coders cannot _encode_ what the other _decoded_ (in
>>>>>>> general).
>>>>>>> >>>>>>>>
>>>>>>> >>>>>>>>       b) when is a PCollection<T> computed inside an
>>>>>>> environment, the
>>>>>>> >>>>>>>> elements are encoded using SDK coder on the side of
>>>>>>> SDK-harness and
>>>>>>> >>>>>>>> decoded using runner coder after receiving in the runner
>>>>>>> >>>>>>>>
>>>>>>> >>>>>>>>       c) under specific circumstances, the encode-decode
>>>>>>> step can be
>>>>>>> >>>>>>>> optimized out, that is the case where the SDK coder and all
>>>>>>> its
>>>>>>> >>>>>>>> subcoders are all well-known to the runner (in the present,
>>>>>>> that means
>>>>>>> >>>>>>>> that all the parts present in the model coders set). The
>>>>>>> reason for that
>>>>>>> >>>>>>>> is that in this specific situation
>>>>>>> runner_decode(sdk_encode(X)) = X.
>>>>>>> >>>>>>>> This property is essential.
>>>>>>> >>>>>>> However, in general, X can only pass from the SDK to the
>>>>>>> runner (or
>>>>>>> >>>>>>> vice versa) in encoded form.
>>>>>>> >>>>>> In general yes, but we are (mostly) talking transform
>>>>>>> inlining here, so
>>>>>>> >>>>>> it that particular situation, the elements might be passed in
>>>>>>> decoded form.
>>>>>>> >>>>>>>>       d) from b) immediately follows, that when a
>>>>>>> PTransform does not run in
>>>>>>> >>>>>>>> an environment (and this might be due to the transform
>>>>>>> being runner
>>>>>>> >>>>>>>> native, inlined, source (e.g. Impulse or TestStream)) the
>>>>>>> elements have
>>>>>>> >>>>>>>> to be encoded by SDK coder, immediately following decode by
>>>>>>> runner
>>>>>>> >>>>>>>> coder. That (surprisingly) applies even to situations when
>>>>>>> runner is
>>>>>>> >>>>>>>> implemented using different language than the client SDK,
>>>>>>> because it
>>>>>>> >>>>>>>> implies that the type of produced elements must be one of
>>>>>>> types encoded
>>>>>>> >>>>>>>> using model coders (well-known to the runner, otherwise the
>>>>>>> SDK will not
>>>>>>> >>>>>>>> be able to consume it). But - due to property c) - this
>>>>>>> means that this
>>>>>>> >>>>>>>> encode-decode step can be optimized out. This does not mean
>>>>>>> that it is
>>>>>>> >>>>>>>> not (logically) present, though. This is exactly the case
>>>>>>> of native
>>>>>>> >>>>>>>> Impulse transform.
>>>>>>> >>>>>>>>
>>>>>>> >>>>>>>> Now, from that we can conclude that on the boundary between
>>>>>>> executable
>>>>>>> >>>>>>>> stages, or between runner (inlined) transform and
>>>>>>> executable stage, each
>>>>>>> >>>>>>>> PCollection has to be encoded using SDK coder and
>>>>>>> immediately decoded by
>>>>>>> >>>>>>>> runner coder, *unless this can be optimized out* by
>>>>>>> property c).
>>>>>>> >>>>>>>>
>>>>>>> >>>>>>>> This gives us two options where to implement this
>>>>>>> encode/decode step:
>>>>>>> >>>>>>>>
>>>>>>> >>>>>>>>       1) completely inside runner with the possibility to
>>>>>>> optimize the
>>>>>>> >>>>>>>> encode/decode step by identity under right circumstances
>>>>>>> >>>>>>>>
>>>>>>> >>>>>>>>       2) partly in the runner and partly in the SDK - that
>>>>>>> is we encode
>>>>>>> >>>>>>>> elements of PCollection using SDK coder into bytes, pass
>>>>>>> those to the
>>>>>>> >>>>>>>> SDK harness and apply a custom decode step there. This
>>>>>>> works because SDK
>>>>>>> >>>>>>>> coder encoded elements are in byte[], and that is
>>>>>>> well-known coder type.
>>>>>>> >>>>>>>> We again only leverage property c) and optimize the SDK
>>>>>>> coder encode,
>>>>>>> >>>>>>>> runner decode step out.
>>>>>>> >>>>>>>>
>>>>>>> >>>>>>>> The option 2) is exactly the proposal of TestStream
>>>>>>> producing byte[] and
>>>>>>> >>>>>>>> decoding inside SDK-harness, the TestStream is actually
>>>>>>> inlined
>>>>>>> >>>>>>>> transform, the elements are produced directly in runner
>>>>>>> (the SDK coder
>>>>>>> >>>>>>>> is not known to the runner, but that does not matter,
>>>>>>> because the
>>>>>>> >>>>>>>> elements are already encoded by client).
>>>>>>> >>>>>>>>
>>>>>>> >>>>>>>>      From the above it seems to me, that option 1) should
>>>>>>> be preferred, because:
>>>>>>> >>>>>>>>
>>>>>>> >>>>>>>>       i) it is generic, applicable to all inlined
>>>>>>> transforms, any sources
>>>>>>> >>>>>>>>
>>>>>>> >>>>>>>>       ii) it is consistent with how things logically work
>>>>>>> underneath
>>>>>>> >>>>>>>>
>>>>>>> >>>>>>>>       iii) it offers better room for optimization - option
>>>>>>> 2) might result
>>>>>>> >>>>>>>> in cases when the elements are passed from the runner to
>>>>>>> the SDK-harness
>>>>>>> >>>>>>>> only for the sake of the decoding from SDK coder and
>>>>>>> immediately
>>>>>>> >>>>>>>> encoding back using SDK-coder and returned back to the
>>>>>>> runner. This
>>>>>>> >>>>>>>> would be the case when TestStream would be directly
>>>>>>> consumed by inlined
>>>>>>> >>>>>>>> (or external) transform.
>>>>>>> >>>>>>> (1) is not possible if the Coder in question is not known to
>>>>>>> the
>>>>>>> >>>>>>> Runner, which is why I proposed (2).
>>>>>>> >>>>>> There is no particular need for the coder to be known. If
>>>>>>> transform is
>>>>>>> >>>>>> to be inlined, what *has* to be known is the SDK-encoded form
>>>>>>> of the
>>>>>>> >>>>>> elements. That holds true if:
>>>>>>> >>>>>>
>>>>>>> >>>>>>      a) either the SDK coder is known, or
>>>>>>> >>>>>>
>>>>>>> >>>>>>      b) encoded form of the produced elements is known in
>>>>>>> advance
>>>>>>> >>>>>>
>>>>>>> >>>>>> For TestStream it is the case b). For inlined primitive Read
>>>>>>> (or any
>>>>>>> >>>>>> other transform which executes code) it is a).
>>>>>>> >>>>> There's another hitch here for TestStream. For historical
>>>>>>> reasons,
>>>>>>> >>>>> coders actually represent two encodings: nested (aka self
>>>>>>> delimiting)
>>>>>>> >>>>> and unnested. TestStream elements are given as unnested
>>>>>>> encoded bytes,
>>>>>>> >>>>> but the nested encoding is required for sending data to the
>>>>>>> SDK. The
>>>>>>> >>>>> runner can't go from <nested encoding> to <unnested encoding>
>>>>>>> for an
>>>>>>> >>>>> arbitrary unknown coder.
>>>>>>> >>>>>
>>>>>>> >>>>> (Even if it weren't for this complication, to be able to send
>>>>>>> already
>>>>>>> >>>>> encoded bytes of an unknown coder to the SDK will also
>>>>>>> complicate the
>>>>>>> >>>>> logic in choosing the coder to be used for the channel and
>>>>>>> sending the
>>>>>>> >>>>> data, which is some of what you're running into (but can be
>>>>>>> solved
>>>>>>> >>>>> differently for inlined reads as the coder can always be known
>>>>>>> by the
>>>>>>> >>>>> runner).)
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>>
>>>>> Jack McCluskey
>>>>> SWE - DataPLS PLAT/ Beam Go
>>>>> RDU
>>>>> [email protected]
>>>>>
>>>>>
>>>>>

Reply via email to