Go also doesn't do this, instead maintaining a list of known standard coders and using that as a list of capabilities: https://github.com/apache/beam/blob/4bc870806099f03265e7dfb48b142f00cee42f47/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L59
On Thu, Sep 9, 2021 at 1:29 PM Robert Bradshaw <[email protected]> wrote: > You are right, Java does not implement this correctly. It should be > querying the capabilities section of the environment proto. (For java > environments, this is populated from ModelCoders, e.g. > https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java#L386 > ) > > Looks like Python doesn't do any better: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L459 > > > > On Thu, Sep 9, 2021 at 10:13 AM Jan Lukavský <[email protected]> wrote: > >> This makes a *lot* of sense. But it seems to me, that this is not the way >> Java-based runners - that use runners-core-construction-java module - >> handle it. If I interpret it correctly, then the set of ModelCoders is >> hard-coded [1] and essentially required to be known by all SDKs [2]. >> >> There seems to be no negotiation between what SDK harness knows and what >> the runner knows. The runner might be able to define the wire coder for the >> SDK (via the ProcessBundleDescriptor), but the SDK (for Java runners) seems >> not to be able to play any role in this [3]. Therefore I think that if an >> SDK does not know the set of Java ModelCoders, then the runner and the SDK >> might not agree on the binary encoding of the elements (BTW, which is why >> Java Coders cannot be part of ModelCoders and I finally understand why I >> had such troubles adding it there - it cannot be there). >> >> Is it possible we are missing some part of this runner-to-sdk coder >> negotiation in runners-core-contruction-java? >> >> [1] >> https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java#L75 >> >> [2] >> https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62 >> >> [3] >> https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L104 >> On 9/9/21 12:18 AM, Robert Bradshaw wrote: >> >> The whole notion of an absolute set of known coders is a misnomer. It >> would require all Runners and SDKs to be updated synchronously for every >> new coder they might want to share. >> >> Instead, what we have are >> >> * Standard Coders which have well-defined, language-agnostic >> representations and encodings, which may be used for interoperability and >> efficiency, and >> * Required Coders which are the minimum needed to execute the pipeline. >> >> The latter consists only of bytes (for impulse), kv and iterable (for >> GBK), windowed value (for windowing information) and length prefix (to be >> able to handle anything else). >> >> >> On Wed, Sep 8, 2021 at 3:03 PM Robert Burke <[email protected]> wrote: >> >>> Is the claim that the Standard bytes and String_utf8 coders are not >>> "known coders"? >>> >>> What's the point of the standard coders if they are not the canonical >>> "known coders" that can generally be expected to be known by runners/other >>> SDKs? >>> >>> >>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L790 >>> >>> The Go SDK rather heavily biases towards using the standard coders for >>> their closes language equivalents rather than going into override/custom >>> specified soup. (It's not possible to globally override the coders for the >>> '[]byte' and 'string' types, nor is there often reason to.) >>> >>> On Wed, Sep 8, 2021, 2:56 PM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> On Wed, Sep 8, 2021 at 1:48 PM Jack McCluskey <[email protected]> >>>> wrote: >>>> >>>>> Hey all, >>>>> >>>>> Just catching up on the thread since I did the TestStream Go SDK >>>>> implementation. The discussion about length prefixing behavior for known >>>>> vs. unknown coders is interesting, since we ran into strings and byte >>>>> slices getting extra length prefixes attached to them by Flink despite >>>>> being known coders. >>>>> >>>> >>>> Known to who? >>>> >>>> >>>>> Based on what's been said, that isn't expected behavior, right? >>>>> >>>> >>>> No, it's not. >>>> >>>> I would check to make sure the Go SDK is respecting the Coder (length >>>> prefixed or not) that's set on the channel, rather than guessing at what it >>>> expects it to be based on the Go type. >>>> >>>> >>>>> On Tue, Sep 7, 2021 at 2:46 PM Jan Lukavský <[email protected]> wrote: >>>>> >>>>>> On 9/7/21 6:02 PM, Reuven Lax wrote: >>>>>> >>>>>> Historically the DataflowRunner has been much more careful about not >>>>>> breaking update, since this is a frequent operation by Dataflow users. I >>>>>> think we've been less careful aboutt other runners, but as we see clearly >>>>>> here Fllnk users do care about this as well, so we should probably test >>>>>> upgrade compatibility for Flink. >>>>>> >>>>>> One strategy that Dataflow uses is to avoid embedding the Java >>>>>> serialized form of a Coder in the graph, as this is a much higher risk of >>>>>> breakage (as we see with the issue you llnked to). Possibly similar >>>>>> strategies should be investigated for Fllink. >>>>>> >>>>>> +1, that would be great! >>>>>> >>>>>> >>>>>> Reuven >>>>>> >>>>>> On Mon, Sep 6, 2021 at 1:29 AM Jan Lukavský <[email protected]> wrote: >>>>>> >>>>>>> > Unfortunately the most basic coders (e.g. bytes, string, kv, >>>>>>> iterable) >>>>>>> > care about Context because they predated this deprecation, and >>>>>>> > changing coders is hard (due to no way to update the encoding for a >>>>>>> > streaming pipeline). >>>>>>> This is unrelated, but - regarding changing coders due to concerns >>>>>>> about >>>>>>> pipeline upgrades, we break this quite often, at least for some >>>>>>> runners. >>>>>>> Most recently [1]. >>>>>>> >>>>>>> > It is currently the latter for runners using this code (which not >>>>>>> all >>>>>>> > do, e.g. the ULR and Dataflow runners). I don't think we want to >>>>>>> > ossify this decision as part of the spec. (Note that even what's >>>>>>> > "known" and "unknown" can change from runner to runner.) >>>>>>> This is interesting and unexpected for me. How do runners decide >>>>>>> about >>>>>>> how they encode elements between SDK harness and the runner? How do >>>>>>> they >>>>>>> inform the SDK harness about this decision? My impression was that >>>>>>> this >>>>>>> is well-defined at the model level. If not, then we have the reason >>>>>>> for >>>>>>> misunderstanding in this conversation. :-) >>>>>>> >>>>>>> Jan >>>>>>> >>>>>>> [1] >>>>>>> >>>>>>> https://lists.apache.org/thread.html/r51ee0bbaba2dcef13524a189c1f579f209483418a1568acff0e2c789%40%3Cdev.beam.apache.org%3E >>>>>>> >>>>>>> On 9/4/21 7:32 PM, Robert Bradshaw wrote: >>>>>>> > On Sat, Sep 4, 2021 at 6:52 AM Jan Lukavský <[email protected]> >>>>>>> wrote: >>>>>>> >> On 9/3/21 9:50 PM, Robert Bradshaw wrote: >>>>>>> >> >>>>>>> >>> On Fri, Sep 3, 2021 at 11:42 AM Jan Lukavský<[email protected]> >>>>>>> wrote: >>>>>>> >>>> Hi Robert, >>>>>>> >>>> >>>>>>> >>>>> There's another hitch here for TestStream. For historical >>>>>>> reasons, >>>>>>> >>>>> coders actually represent two encodings: nested (aka self >>>>>>> delimiting) >>>>>>> >>>>> and unnested. TestStream elements are given as unnested >>>>>>> encoded bytes, >>>>>>> >>>>> but the nested encoding is required for sending data to the >>>>>>> SDK. The >>>>>>> >>>>> runner can't go from <nested encoding> to <unnested encoding> >>>>>>> for an >>>>>>> >>>>> arbitrary unknown coder. >>>>>>> >>>>> >>>>>>> >>>>> (Even if it weren't for this complication, to be able to send >>>>>>> already >>>>>>> >>>>> encoded bytes of an unknown coder to the SDK will also >>>>>>> complicate the >>>>>>> >>>>> logic in choosing the coder to be used for the channel and >>>>>>> sending the >>>>>>> >>>>> data, which is some of what you're running into (but can be >>>>>>> solved >>>>>>> >>>>> differently for inlined reads as the coder can always be known >>>>>>> by the >>>>>>> >>>>> runner).) >>>>>>> >>>> It is hard for me to argue with "historical reasons". But - the >>>>>>> "nested" >>>>>>> >>>> and "unnested" coders look very similar to SDK-coder and >>>>>>> runner-coder >>>>>>> >>>> spaces. >>>>>>> >>> Unfortunately, they're actually orthogonal to that. >>>>>>> >> Hm, do you mean the Context passed to the encode/decode method? >>>>>>> [1] That >>>>>>> >> seems to be deprecated, I assume that most coders use the default >>>>>>> >> implementation and simply ignore the Context? >>>>>>> > Unfortunately the most basic coders (e.g. bytes, string, kv, >>>>>>> iterable) >>>>>>> > care about Context because they predated this deprecation, and >>>>>>> > changing coders is hard (due to no way to update the encoding for a >>>>>>> > streaming pipeline). >>>>>>> > >>>>>>> >> Even if not - whether or >>>>>>> >> not the elements are encoded using NESTED Context or UNNESTED >>>>>>> Context >>>>>>> >> should be part of the contract of TestStream, right? Most likely >>>>>>> it is >>>>>>> >> the UNNESTED one, if I understand correctly what that does. Under >>>>>>> what >>>>>>> >> conditions is the deprecated encode/decode method used? >>>>>>> > Yes, it's the UNNESTED one. >>>>>>> > >>>>>>> >> [1] >>>>>>> >> >>>>>>> https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L134 >>>>>>> >> >>>>>>> >>>> The runner's responsibility is not to go from "<nested >>>>>>> >>>> encoding>" (SDK coder) to "<unnested encoding>" for arbitrary >>>>>>> coder. >>>>>>> >>>> That is really impossible. But a coder is a function, right? >>>>>>> Function >>>>>>> >>>> maps from universe A to universe B (in general). TestStream >>>>>>> provides a >>>>>>> >>>> set of elements, and these elements are the "universe". For >>>>>>> those >>>>>>> >>>> elements it also provides the encoded form, which can be >>>>>>> interpreted as >>>>>>> >>>> the definition of the coder. >>>>>>> >>> The problem here is that there is not "the encoded form" for a >>>>>>> Coder >>>>>>> >>> but two encoded forms, and we have the wrong one. Things could >>>>>>> be made >>>>>>> >>> to work if we had the other. >>>>>>> >> Which two encoded forms do you refer to? Elements encoded by both >>>>>>> the >>>>>>> >> SDK-coder and runner-coder (and I ignore the Context here once >>>>>>> again) >>>>>>> >> have the same binary representation (which they must have, >>>>>>> otherwise it >>>>>>> >> would be impossible to decode elements coming from the runner to >>>>>>> the >>>>>>> >> SDK-harness or vice-versa). >>>>>>> >>>> Therefore - technically (and formally) - >>>>>>> >>>> the SDK coder for the TestStream is known to the runner, >>>>>>> regardless of >>>>>>> >>>> the language the runner is written in. >>>>>>> >>>> >>>>>>> >>>> To move this discussion forward, I think we should look for >>>>>>> answers to >>>>>>> >>>> the following questions: >>>>>>> >>>> >>>>>>> >>>> a) do we have any clues that show, that the proposed "in >>>>>>> runner" >>>>>>> >>>> solution will not work? >>>>>>> >>> OK, thinking about it some more, in the TestStream, we can use >>>>>>> the >>>>>>> >>> happy coincidence that >>>>>>> >>> >>>>>>> >>> LengthPrefixed(C).encode(x, nested=True) == >>>>>>> >>> VarLong.encode(len(C.encode(x, nested=False))) || C.encode(x, >>>>>>> >>> nested=False) >>>>>>> >>> >>>>>>> >>> (where || denotes concatenation) and the fact that we have >>>>>>> >>> >>>>>>> >>> C.encode(x, nested=False) >>>>>>> >>> >>>>>>> >>> in hand. >>>>>>> >>> >>>>>>> >>> A possible fix here for the OP's question is that when >>>>>>> rehydrating the >>>>>>> >>> TestStream transform it must behave differently according to the >>>>>>> coder >>>>>>> >>> used in the subsequent channel (e.g. for known coders, it >>>>>>> decodes the >>>>>>> >>> elements and emits them directly, but for unknown coders, it >>>>>>> prefixes >>>>>>> >>> them with their length and emits byte strings. It gets more >>>>>>> >>> complicated for nested coders, e.g. for a KV<known-coder, >>>>>>> >>> unknown-coder> the channel might be LP(KV<known-coder, >>>>>>> unknown-coder)) >>>>>>> >>> or KV<known-coder, LP(unknown-coder)) which have different >>>>>>> encodings >>>>>>> >>> (and the latter, which is the default, requires transcoding the >>>>>>> bytes >>>>>>> >>> to inject the length in the middle which is found by decoding the >>>>>>> >>> first component). As well as getting more complex, this really >>>>>>> seems >>>>>>> >>> to violate the spirit of separation of concerns. >>>>>>> >> How do we make the decision if the channel is LP<KV<..>> or >>>>>>> >> KV<LP<unknown>, known>? From my understanding it is always the >>>>>>> latter, >>>>>>> >> because of [2]. >>>>>>> >> >>>>>>> >> [2] >>>>>>> >> >>>>>>> https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48 >>>>>>> > It is currently the latter for runners using this code (which not >>>>>>> all >>>>>>> > do, e.g. the ULR and Dataflow runners). I don't think we want to >>>>>>> > ossify this decision as part of the spec. (Note that even what's >>>>>>> > "known" and "unknown" can change from runner to runner.) >>>>>>> > >>>>>>> >>>> b) do we think, that it will not be robust enough to >>>>>>> incorporate the >>>>>>> >>>> other use-cases (line generic transform inlining, taking into >>>>>>> account >>>>>>> >>>> that this applies only to runners that are written in the same >>>>>>> language >>>>>>> >>>> as the submitting SDK, because otherwise, there is nothing to >>>>>>> inline)? >>>>>>> >>> Being in the same language is not a prerequisite to "inlining," >>>>>>> e.g. >>>>>>> >>> the PubSub source on Dataflow is recognized as such and not >>>>>>> executed >>>>>>> >>> as SDK code but natively. >>>>>>> >> Agree, that is actually exactly what happens with the TestStream. >>>>>>> The >>>>>>> >> transform need not be in the same language, as long as it is >>>>>>> completely >>>>>>> >> understood by the runner, including the SDK-coder (either >>>>>>> explicitly - >>>>>>> >> which might be due to the PCollection coder being composed of >>>>>>> well-known >>>>>>> >> coders only, or implicitly like in the case of TestStream, where >>>>>>> the >>>>>>> >> elements are encoded using the SDK coder. >>>>>>> >>> It is more likely that inlining occurs in the same language if >>>>>>> there >>>>>>> >>> are UDFs involved. >>>>>>> >>> >>>>>>> >>>> I'm convinced, that the TestStream-decode expansion solution is >>>>>>> an >>>>>>> >>>> ad-hoc solution to a generic problem, which is why I'm still >>>>>>> bothering >>>>>>> >>>> this mailing list with my emails on this. :-) >>>>>>> >>>> >>>>>>> >>>> WDYT? >>>>>>> >>> While not a solution to the general problem, I think the >>>>>>> >>> TestStream-only-does-bytes simplifies its definition (primitives >>>>>>> >>> should have as simple/easy to implement definitions as possible) >>>>>>> and >>>>>>> >>> brings it closer to the other root we have: Impulse. (We could >>>>>>> go a >>>>>>> >>> step further and rather than emitting encoded elements, with the >>>>>>> data >>>>>>> >>> in the proto itself, it emits sequence numbers, and a subsequent >>>>>>> ParDo >>>>>>> >>> maps those to concrete elements (e.g. via an in-memory map), but >>>>>>> that >>>>>>> >>> further step doesn't buy much...) >>>>>>> >>> >>>>>>> >>> Only runners that want to do inlining would have to take on the >>>>>>> >>> complexity of a fully generic solution. >>>>>>> >> I think that if the simplification brings something, we can do >>>>>>> that, but >>>>>>> >> I'd like to understand why we cannot (or should not) use the >>>>>>> generic >>>>>>> >> solution. I think it definitely *should* be possible to use a >>>>>>> generic >>>>>>> >> solution, because otherwise the solution would not be generic. >>>>>>> And it >>>>>>> >> would imply, that we are unable to do generic transform inlining, >>>>>>> which >>>>>>> >> I would find really strange. That would immediately mean, that we >>>>>>> are >>>>>>> >> unable to construct classical runner as a special case of the >>>>>>> portable >>>>>>> >> one, which would be bad I think. >>>>>>> >> >>>>>>> >> The elements in the TestStreamPayload are encoded with pure >>>>>>> SDK-coder, >>>>>>> >> or does this go through the LengthPrefixUnknownCoders logic? If >>>>>>> not, >>>>>>> >> then the problem would be there, because that means, that the >>>>>>> SDK-coder >>>>>>> >> cannot be (implicitly) defined in the runner. If the elements >>>>>>> would be >>>>>>> >> encoded using LP, then it would be possible to decode them using >>>>>>> >> runner-coder and the problem should be solved, or am I still >>>>>>> missing >>>>>>> >> some key parts? >>>>>>> > Yes, the problem is precisely that there are (unspecified) >>>>>>> constraints >>>>>>> > on the coder used by the TestStreamPayload. Just requiring that it >>>>>>> be >>>>>>> > length prefixed is not enough, you have to make constraints on >>>>>>> > sometimes pushing down the length prefixing if it's a composite >>>>>>> (like >>>>>>> > a KV) that depend on what the runner is expected to support in >>>>>>> terms >>>>>>> > of composites and/or the choices it chooses for the channel (and >>>>>>> the >>>>>>> > runner, not knowing the coder, can't transcode between these >>>>>>> choices). >>>>>>> > >>>>>>> > The simpler solution is to constrain this coder to just be byte[] >>>>>>> > rather than let it be a little bit flexible (but not wholly >>>>>>> flexible). >>>>>>> > >>>>>>> > As for a fully generic solution, I think the issue encountered with >>>>>>> > inlining Read vs. TestStream are related to this, but not really >>>>>>> the >>>>>>> > same. With TestStream one has an encoded representation of the >>>>>>> > elements provided by the SDK that the Runner and has no SDK >>>>>>> > representation/execution whereas with the Reads one has unencoded >>>>>>> > elements in hand and a Coder that is understood by both (so long as >>>>>>> > the channel can be negotiated correctly). FWIW, I think the proper >>>>>>> > solution to inlining a Read (or other Transform that would >>>>>>> typically >>>>>>> > be executed in the SDK) is to treat it as a special environment >>>>>>> (where >>>>>>> > we know logically it can work) and then elide, as possible, the >>>>>>> > various encodings, grpc calls, etc. that are unneeded as >>>>>>> everything is >>>>>>> > in process. >>>>>>> > >>>>>>> >>>> On 9/3/21 7:03 PM, Robert Bradshaw wrote: >>>>>>> >>>>> On Fri, Sep 3, 2021 at 2:40 AM Jan Lukavský<[email protected]> >>>>>>> wrote: >>>>>>> >>>>>> On 9/3/21 1:06 AM, Robert Bradshaw wrote: >>>>>>> >>>>>>> On Thu, Sep 2, 2021 at 1:03 AM Jan Lukavský<[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> I had some more time thinking about this and I'll try to >>>>>>> recap that. >>>>>>> >>>>>>>> First some invariants: >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> a) each PCollection<T> has actually two coders - an >>>>>>> _SDK coder_ and a >>>>>>> >>>>>>>> _runner coder_. These coders have the property, that each >>>>>>> one can >>>>>>> >>>>>>>> _decode_ what the other encoded, but the opposite is not >>>>>>> true, the >>>>>>> >>>>>>>> coders cannot _encode_ what the other _decoded_ (in >>>>>>> general). >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> b) when is a PCollection<T> computed inside an >>>>>>> environment, the >>>>>>> >>>>>>>> elements are encoded using SDK coder on the side of >>>>>>> SDK-harness and >>>>>>> >>>>>>>> decoded using runner coder after receiving in the runner >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> c) under specific circumstances, the encode-decode >>>>>>> step can be >>>>>>> >>>>>>>> optimized out, that is the case where the SDK coder and all >>>>>>> its >>>>>>> >>>>>>>> subcoders are all well-known to the runner (in the present, >>>>>>> that means >>>>>>> >>>>>>>> that all the parts present in the model coders set). The >>>>>>> reason for that >>>>>>> >>>>>>>> is that in this specific situation >>>>>>> runner_decode(sdk_encode(X)) = X. >>>>>>> >>>>>>>> This property is essential. >>>>>>> >>>>>>> However, in general, X can only pass from the SDK to the >>>>>>> runner (or >>>>>>> >>>>>>> vice versa) in encoded form. >>>>>>> >>>>>> In general yes, but we are (mostly) talking transform >>>>>>> inlining here, so >>>>>>> >>>>>> it that particular situation, the elements might be passed in >>>>>>> decoded form. >>>>>>> >>>>>>>> d) from b) immediately follows, that when a >>>>>>> PTransform does not run in >>>>>>> >>>>>>>> an environment (and this might be due to the transform >>>>>>> being runner >>>>>>> >>>>>>>> native, inlined, source (e.g. Impulse or TestStream)) the >>>>>>> elements have >>>>>>> >>>>>>>> to be encoded by SDK coder, immediately following decode by >>>>>>> runner >>>>>>> >>>>>>>> coder. That (surprisingly) applies even to situations when >>>>>>> runner is >>>>>>> >>>>>>>> implemented using different language than the client SDK, >>>>>>> because it >>>>>>> >>>>>>>> implies that the type of produced elements must be one of >>>>>>> types encoded >>>>>>> >>>>>>>> using model coders (well-known to the runner, otherwise the >>>>>>> SDK will not >>>>>>> >>>>>>>> be able to consume it). But - due to property c) - this >>>>>>> means that this >>>>>>> >>>>>>>> encode-decode step can be optimized out. This does not mean >>>>>>> that it is >>>>>>> >>>>>>>> not (logically) present, though. This is exactly the case >>>>>>> of native >>>>>>> >>>>>>>> Impulse transform. >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> Now, from that we can conclude that on the boundary between >>>>>>> executable >>>>>>> >>>>>>>> stages, or between runner (inlined) transform and >>>>>>> executable stage, each >>>>>>> >>>>>>>> PCollection has to be encoded using SDK coder and >>>>>>> immediately decoded by >>>>>>> >>>>>>>> runner coder, *unless this can be optimized out* by >>>>>>> property c). >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> This gives us two options where to implement this >>>>>>> encode/decode step: >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> 1) completely inside runner with the possibility to >>>>>>> optimize the >>>>>>> >>>>>>>> encode/decode step by identity under right circumstances >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> 2) partly in the runner and partly in the SDK - that >>>>>>> is we encode >>>>>>> >>>>>>>> elements of PCollection using SDK coder into bytes, pass >>>>>>> those to the >>>>>>> >>>>>>>> SDK harness and apply a custom decode step there. This >>>>>>> works because SDK >>>>>>> >>>>>>>> coder encoded elements are in byte[], and that is >>>>>>> well-known coder type. >>>>>>> >>>>>>>> We again only leverage property c) and optimize the SDK >>>>>>> coder encode, >>>>>>> >>>>>>>> runner decode step out. >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> The option 2) is exactly the proposal of TestStream >>>>>>> producing byte[] and >>>>>>> >>>>>>>> decoding inside SDK-harness, the TestStream is actually >>>>>>> inlined >>>>>>> >>>>>>>> transform, the elements are produced directly in runner >>>>>>> (the SDK coder >>>>>>> >>>>>>>> is not known to the runner, but that does not matter, >>>>>>> because the >>>>>>> >>>>>>>> elements are already encoded by client). >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> From the above it seems to me, that option 1) should >>>>>>> be preferred, because: >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> i) it is generic, applicable to all inlined >>>>>>> transforms, any sources >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> ii) it is consistent with how things logically work >>>>>>> underneath >>>>>>> >>>>>>>> >>>>>>> >>>>>>>> iii) it offers better room for optimization - option >>>>>>> 2) might result >>>>>>> >>>>>>>> in cases when the elements are passed from the runner to >>>>>>> the SDK-harness >>>>>>> >>>>>>>> only for the sake of the decoding from SDK coder and >>>>>>> immediately >>>>>>> >>>>>>>> encoding back using SDK-coder and returned back to the >>>>>>> runner. This >>>>>>> >>>>>>>> would be the case when TestStream would be directly >>>>>>> consumed by inlined >>>>>>> >>>>>>>> (or external) transform. >>>>>>> >>>>>>> (1) is not possible if the Coder in question is not known to >>>>>>> the >>>>>>> >>>>>>> Runner, which is why I proposed (2). >>>>>>> >>>>>> There is no particular need for the coder to be known. If >>>>>>> transform is >>>>>>> >>>>>> to be inlined, what *has* to be known is the SDK-encoded form >>>>>>> of the >>>>>>> >>>>>> elements. That holds true if: >>>>>>> >>>>>> >>>>>>> >>>>>> a) either the SDK coder is known, or >>>>>>> >>>>>> >>>>>>> >>>>>> b) encoded form of the produced elements is known in >>>>>>> advance >>>>>>> >>>>>> >>>>>>> >>>>>> For TestStream it is the case b). For inlined primitive Read >>>>>>> (or any >>>>>>> >>>>>> other transform which executes code) it is a). >>>>>>> >>>>> There's another hitch here for TestStream. For historical >>>>>>> reasons, >>>>>>> >>>>> coders actually represent two encodings: nested (aka self >>>>>>> delimiting) >>>>>>> >>>>> and unnested. TestStream elements are given as unnested >>>>>>> encoded bytes, >>>>>>> >>>>> but the nested encoding is required for sending data to the >>>>>>> SDK. The >>>>>>> >>>>> runner can't go from <nested encoding> to <unnested encoding> >>>>>>> for an >>>>>>> >>>>> arbitrary unknown coder. >>>>>>> >>>>> >>>>>>> >>>>> (Even if it weren't for this complication, to be able to send >>>>>>> already >>>>>>> >>>>> encoded bytes of an unknown coder to the SDK will also >>>>>>> complicate the >>>>>>> >>>>> logic in choosing the coder to be used for the channel and >>>>>>> sending the >>>>>>> >>>>> data, which is some of what you're running into (but can be >>>>>>> solved >>>>>>> >>>>> differently for inlined reads as the coder can always be known >>>>>>> by the >>>>>>> >>>>> runner).) >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> >>>>> Jack McCluskey >>>>> SWE - DataPLS PLAT/ Beam Go >>>>> RDU >>>>> [email protected] >>>>> >>>>> >>>>>
