IIUC that means that for TestStream a runner should: * Look at the coder being used in TestStream * Look at the list of SDK coder capabilities. * If the test stream coder is built of known coder capabilities, then it should not try to LP the provided encoded bytes, but instead simply window wrap them?
Is that right? On Thu, Sep 9, 2021, 12:58 PM Robert Bradshaw <[email protected]> wrote: > Go doesn't have a (portable, cross-language) runner. It's fine for SDKs to > enumerate the coders they understand. Runners, however, should respect what > SDKs declare. > > On Thu, Sep 9, 2021 at 12:54 PM Jack McCluskey <[email protected]> > wrote: > >> Go also doesn't do this, instead maintaining a list of known standard >> coders and using that as a list of capabilities: >> https://github.com/apache/beam/blob/4bc870806099f03265e7dfb48b142f00cee42f47/sdks/go/pkg/beam/core/runtime/graphx/coder.go#L59 >> >> On Thu, Sep 9, 2021 at 1:29 PM Robert Bradshaw <[email protected]> >> wrote: >> >>> You are right, Java does not implement this correctly. It should be >>> querying the capabilities section of the environment proto. (For java >>> environments, this is populated from ModelCoders, e.g. >>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java#L386 >>> ) >>> >>> Looks like Python doesn't do any better: >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L459 >>> >>> >>> >>> On Thu, Sep 9, 2021 at 10:13 AM Jan Lukavský <[email protected]> wrote: >>> >>>> This makes a *lot* of sense. But it seems to me, that this is not the >>>> way Java-based runners - that use runners-core-construction-java module - >>>> handle it. If I interpret it correctly, then the set of ModelCoders is >>>> hard-coded [1] and essentially required to be known by all SDKs [2]. >>>> >>>> There seems to be no negotiation between what SDK harness knows and >>>> what the runner knows. The runner might be able to define the wire coder >>>> for the SDK (via the ProcessBundleDescriptor), but the SDK (for Java >>>> runners) seems not to be able to play any role in this [3]. Therefore I >>>> think that if an SDK does not know the set of Java ModelCoders, then the >>>> runner and the SDK might not agree on the binary encoding of the elements >>>> (BTW, which is why Java Coders cannot be part of ModelCoders and I finally >>>> understand why I had such troubles adding it there - it cannot be there). >>>> >>>> Is it possible we are missing some part of this runner-to-sdk coder >>>> negotiation in runners-core-contruction-java? >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java#L75 >>>> >>>> [2] >>>> https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L62 >>>> >>>> [3] >>>> https://github.com/apache/beam/blob/a871a494a9935093ab5f42d88d584d32aa233b8a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L104 >>>> On 9/9/21 12:18 AM, Robert Bradshaw wrote: >>>> >>>> The whole notion of an absolute set of known coders is a misnomer. It >>>> would require all Runners and SDKs to be updated synchronously for every >>>> new coder they might want to share. >>>> >>>> Instead, what we have are >>>> >>>> * Standard Coders which have well-defined, language-agnostic >>>> representations and encodings, which may be used for interoperability and >>>> efficiency, and >>>> * Required Coders which are the minimum needed to execute the pipeline. >>>> >>>> The latter consists only of bytes (for impulse), kv and iterable (for >>>> GBK), windowed value (for windowing information) and length prefix (to be >>>> able to handle anything else). >>>> >>>> >>>> On Wed, Sep 8, 2021 at 3:03 PM Robert Burke <[email protected]> wrote: >>>> >>>>> Is the claim that the Standard bytes and String_utf8 coders are not >>>>> "known coders"? >>>>> >>>>> What's the point of the standard coders if they are not the canonical >>>>> "known coders" that can generally be expected to be known by runners/other >>>>> SDKs? >>>>> >>>>> >>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L790 >>>>> >>>>> The Go SDK rather heavily biases towards using the standard coders for >>>>> their closes language equivalents rather than going into override/custom >>>>> specified soup. (It's not possible to globally override the coders for the >>>>> '[]byte' and 'string' types, nor is there often reason to.) >>>>> >>>>> On Wed, Sep 8, 2021, 2:56 PM Robert Bradshaw <[email protected]> >>>>> wrote: >>>>> >>>>>> On Wed, Sep 8, 2021 at 1:48 PM Jack McCluskey <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hey all, >>>>>>> >>>>>>> Just catching up on the thread since I did the TestStream Go SDK >>>>>>> implementation. The discussion about length prefixing behavior for known >>>>>>> vs. unknown coders is interesting, since we ran into strings and byte >>>>>>> slices getting extra length prefixes attached to them by Flink despite >>>>>>> being known coders. >>>>>>> >>>>>> >>>>>> Known to who? >>>>>> >>>>>> >>>>>>> Based on what's been said, that isn't expected behavior, right? >>>>>>> >>>>>> >>>>>> No, it's not. >>>>>> >>>>>> I would check to make sure the Go SDK is respecting the Coder (length >>>>>> prefixed or not) that's set on the channel, rather than guessing at what >>>>>> it >>>>>> expects it to be based on the Go type. >>>>>> >>>>>> >>>>>>> On Tue, Sep 7, 2021 at 2:46 PM Jan Lukavský <[email protected]> wrote: >>>>>>> >>>>>>>> On 9/7/21 6:02 PM, Reuven Lax wrote: >>>>>>>> >>>>>>>> Historically the DataflowRunner has been much more careful about >>>>>>>> not breaking update, since this is a frequent operation by Dataflow >>>>>>>> users. >>>>>>>> I think we've been less careful aboutt other runners, but as we see >>>>>>>> clearly >>>>>>>> here Fllnk users do care about this as well, so we should probably test >>>>>>>> upgrade compatibility for Flink. >>>>>>>> >>>>>>>> One strategy that Dataflow uses is to avoid embedding the Java >>>>>>>> serialized form of a Coder in the graph, as this is a much higher risk >>>>>>>> of >>>>>>>> breakage (as we see with the issue you llnked to). Possibly similar >>>>>>>> strategies should be investigated for Fllink. >>>>>>>> >>>>>>>> +1, that would be great! >>>>>>>> >>>>>>>> >>>>>>>> Reuven >>>>>>>> >>>>>>>> On Mon, Sep 6, 2021 at 1:29 AM Jan Lukavský <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> > Unfortunately the most basic coders (e.g. bytes, string, kv, >>>>>>>>> iterable) >>>>>>>>> > care about Context because they predated this deprecation, and >>>>>>>>> > changing coders is hard (due to no way to update the encoding >>>>>>>>> for a >>>>>>>>> > streaming pipeline). >>>>>>>>> This is unrelated, but - regarding changing coders due to concerns >>>>>>>>> about >>>>>>>>> pipeline upgrades, we break this quite often, at least for some >>>>>>>>> runners. >>>>>>>>> Most recently [1]. >>>>>>>>> >>>>>>>>> > It is currently the latter for runners using this code (which >>>>>>>>> not all >>>>>>>>> > do, e.g. the ULR and Dataflow runners). I don't think we want to >>>>>>>>> > ossify this decision as part of the spec. (Note that even what's >>>>>>>>> > "known" and "unknown" can change from runner to runner.) >>>>>>>>> This is interesting and unexpected for me. How do runners decide >>>>>>>>> about >>>>>>>>> how they encode elements between SDK harness and the runner? How >>>>>>>>> do they >>>>>>>>> inform the SDK harness about this decision? My impression was that >>>>>>>>> this >>>>>>>>> is well-defined at the model level. If not, then we have the >>>>>>>>> reason for >>>>>>>>> misunderstanding in this conversation. :-) >>>>>>>>> >>>>>>>>> Jan >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> >>>>>>>>> https://lists.apache.org/thread.html/r51ee0bbaba2dcef13524a189c1f579f209483418a1568acff0e2c789%40%3Cdev.beam.apache.org%3E >>>>>>>>> >>>>>>>>> On 9/4/21 7:32 PM, Robert Bradshaw wrote: >>>>>>>>> > On Sat, Sep 4, 2021 at 6:52 AM Jan Lukavský <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >> On 9/3/21 9:50 PM, Robert Bradshaw wrote: >>>>>>>>> >> >>>>>>>>> >>> On Fri, Sep 3, 2021 at 11:42 AM Jan Lukavský<[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>> Hi Robert, >>>>>>>>> >>>> >>>>>>>>> >>>>> There's another hitch here for TestStream. For historical >>>>>>>>> reasons, >>>>>>>>> >>>>> coders actually represent two encodings: nested (aka self >>>>>>>>> delimiting) >>>>>>>>> >>>>> and unnested. TestStream elements are given as unnested >>>>>>>>> encoded bytes, >>>>>>>>> >>>>> but the nested encoding is required for sending data to the >>>>>>>>> SDK. The >>>>>>>>> >>>>> runner can't go from <nested encoding> to <unnested >>>>>>>>> encoding> for an >>>>>>>>> >>>>> arbitrary unknown coder. >>>>>>>>> >>>>> >>>>>>>>> >>>>> (Even if it weren't for this complication, to be able to >>>>>>>>> send already >>>>>>>>> >>>>> encoded bytes of an unknown coder to the SDK will also >>>>>>>>> complicate the >>>>>>>>> >>>>> logic in choosing the coder to be used for the channel and >>>>>>>>> sending the >>>>>>>>> >>>>> data, which is some of what you're running into (but can be >>>>>>>>> solved >>>>>>>>> >>>>> differently for inlined reads as the coder can always be >>>>>>>>> known by the >>>>>>>>> >>>>> runner).) >>>>>>>>> >>>> It is hard for me to argue with "historical reasons". But - >>>>>>>>> the "nested" >>>>>>>>> >>>> and "unnested" coders look very similar to SDK-coder and >>>>>>>>> runner-coder >>>>>>>>> >>>> spaces. >>>>>>>>> >>> Unfortunately, they're actually orthogonal to that. >>>>>>>>> >> Hm, do you mean the Context passed to the encode/decode method? >>>>>>>>> [1] That >>>>>>>>> >> seems to be deprecated, I assume that most coders use the >>>>>>>>> default >>>>>>>>> >> implementation and simply ignore the Context? >>>>>>>>> > Unfortunately the most basic coders (e.g. bytes, string, kv, >>>>>>>>> iterable) >>>>>>>>> > care about Context because they predated this deprecation, and >>>>>>>>> > changing coders is hard (due to no way to update the encoding >>>>>>>>> for a >>>>>>>>> > streaming pipeline). >>>>>>>>> > >>>>>>>>> >> Even if not - whether or >>>>>>>>> >> not the elements are encoded using NESTED Context or UNNESTED >>>>>>>>> Context >>>>>>>>> >> should be part of the contract of TestStream, right? Most >>>>>>>>> likely it is >>>>>>>>> >> the UNNESTED one, if I understand correctly what that does. >>>>>>>>> Under what >>>>>>>>> >> conditions is the deprecated encode/decode method used? >>>>>>>>> > Yes, it's the UNNESTED one. >>>>>>>>> > >>>>>>>>> >> [1] >>>>>>>>> >> >>>>>>>>> https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L134 >>>>>>>>> >> >>>>>>>>> >>>> The runner's responsibility is not to go from "<nested >>>>>>>>> >>>> encoding>" (SDK coder) to "<unnested encoding>" for arbitrary >>>>>>>>> coder. >>>>>>>>> >>>> That is really impossible. But a coder is a function, right? >>>>>>>>> Function >>>>>>>>> >>>> maps from universe A to universe B (in general). TestStream >>>>>>>>> provides a >>>>>>>>> >>>> set of elements, and these elements are the "universe". For >>>>>>>>> those >>>>>>>>> >>>> elements it also provides the encoded form, which can be >>>>>>>>> interpreted as >>>>>>>>> >>>> the definition of the coder. >>>>>>>>> >>> The problem here is that there is not "the encoded form" for a >>>>>>>>> Coder >>>>>>>>> >>> but two encoded forms, and we have the wrong one. Things could >>>>>>>>> be made >>>>>>>>> >>> to work if we had the other. >>>>>>>>> >> Which two encoded forms do you refer to? Elements encoded by >>>>>>>>> both the >>>>>>>>> >> SDK-coder and runner-coder (and I ignore the Context here once >>>>>>>>> again) >>>>>>>>> >> have the same binary representation (which they must have, >>>>>>>>> otherwise it >>>>>>>>> >> would be impossible to decode elements coming from the runner >>>>>>>>> to the >>>>>>>>> >> SDK-harness or vice-versa). >>>>>>>>> >>>> Therefore - technically (and formally) - >>>>>>>>> >>>> the SDK coder for the TestStream is known to the runner, >>>>>>>>> regardless of >>>>>>>>> >>>> the language the runner is written in. >>>>>>>>> >>>> >>>>>>>>> >>>> To move this discussion forward, I think we should look for >>>>>>>>> answers to >>>>>>>>> >>>> the following questions: >>>>>>>>> >>>> >>>>>>>>> >>>> a) do we have any clues that show, that the proposed "in >>>>>>>>> runner" >>>>>>>>> >>>> solution will not work? >>>>>>>>> >>> OK, thinking about it some more, in the TestStream, we can use >>>>>>>>> the >>>>>>>>> >>> happy coincidence that >>>>>>>>> >>> >>>>>>>>> >>> LengthPrefixed(C).encode(x, nested=True) == >>>>>>>>> >>> VarLong.encode(len(C.encode(x, nested=False))) || C.encode(x, >>>>>>>>> >>> nested=False) >>>>>>>>> >>> >>>>>>>>> >>> (where || denotes concatenation) and the fact that we have >>>>>>>>> >>> >>>>>>>>> >>> C.encode(x, nested=False) >>>>>>>>> >>> >>>>>>>>> >>> in hand. >>>>>>>>> >>> >>>>>>>>> >>> A possible fix here for the OP's question is that when >>>>>>>>> rehydrating the >>>>>>>>> >>> TestStream transform it must behave differently according to >>>>>>>>> the coder >>>>>>>>> >>> used in the subsequent channel (e.g. for known coders, it >>>>>>>>> decodes the >>>>>>>>> >>> elements and emits them directly, but for unknown coders, it >>>>>>>>> prefixes >>>>>>>>> >>> them with their length and emits byte strings. It gets more >>>>>>>>> >>> complicated for nested coders, e.g. for a KV<known-coder, >>>>>>>>> >>> unknown-coder> the channel might be LP(KV<known-coder, >>>>>>>>> unknown-coder)) >>>>>>>>> >>> or KV<known-coder, LP(unknown-coder)) which have different >>>>>>>>> encodings >>>>>>>>> >>> (and the latter, which is the default, requires transcoding >>>>>>>>> the bytes >>>>>>>>> >>> to inject the length in the middle which is found by decoding >>>>>>>>> the >>>>>>>>> >>> first component). As well as getting more complex, this really >>>>>>>>> seems >>>>>>>>> >>> to violate the spirit of separation of concerns. >>>>>>>>> >> How do we make the decision if the channel is LP<KV<..>> or >>>>>>>>> >> KV<LP<unknown>, known>? From my understanding it is always the >>>>>>>>> latter, >>>>>>>>> >> because of [2]. >>>>>>>>> >> >>>>>>>>> >> [2] >>>>>>>>> >> >>>>>>>>> https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48 >>>>>>>>> > It is currently the latter for runners using this code (which >>>>>>>>> not all >>>>>>>>> > do, e.g. the ULR and Dataflow runners). I don't think we want to >>>>>>>>> > ossify this decision as part of the spec. (Note that even what's >>>>>>>>> > "known" and "unknown" can change from runner to runner.) >>>>>>>>> > >>>>>>>>> >>>> b) do we think, that it will not be robust enough to >>>>>>>>> incorporate the >>>>>>>>> >>>> other use-cases (line generic transform inlining, taking into >>>>>>>>> account >>>>>>>>> >>>> that this applies only to runners that are written in the >>>>>>>>> same language >>>>>>>>> >>>> as the submitting SDK, because otherwise, there is nothing to >>>>>>>>> inline)? >>>>>>>>> >>> Being in the same language is not a prerequisite to >>>>>>>>> "inlining," e.g. >>>>>>>>> >>> the PubSub source on Dataflow is recognized as such and not >>>>>>>>> executed >>>>>>>>> >>> as SDK code but natively. >>>>>>>>> >> Agree, that is actually exactly what happens with the >>>>>>>>> TestStream. The >>>>>>>>> >> transform need not be in the same language, as long as it is >>>>>>>>> completely >>>>>>>>> >> understood by the runner, including the SDK-coder (either >>>>>>>>> explicitly - >>>>>>>>> >> which might be due to the PCollection coder being composed of >>>>>>>>> well-known >>>>>>>>> >> coders only, or implicitly like in the case of TestStream, >>>>>>>>> where the >>>>>>>>> >> elements are encoded using the SDK coder. >>>>>>>>> >>> It is more likely that inlining occurs in the same language if >>>>>>>>> there >>>>>>>>> >>> are UDFs involved. >>>>>>>>> >>> >>>>>>>>> >>>> I'm convinced, that the TestStream-decode expansion solution >>>>>>>>> is an >>>>>>>>> >>>> ad-hoc solution to a generic problem, which is why I'm still >>>>>>>>> bothering >>>>>>>>> >>>> this mailing list with my emails on this. :-) >>>>>>>>> >>>> >>>>>>>>> >>>> WDYT? >>>>>>>>> >>> While not a solution to the general problem, I think the >>>>>>>>> >>> TestStream-only-does-bytes simplifies its definition >>>>>>>>> (primitives >>>>>>>>> >>> should have as simple/easy to implement definitions as >>>>>>>>> possible) and >>>>>>>>> >>> brings it closer to the other root we have: Impulse. (We could >>>>>>>>> go a >>>>>>>>> >>> step further and rather than emitting encoded elements, with >>>>>>>>> the data >>>>>>>>> >>> in the proto itself, it emits sequence numbers, and a >>>>>>>>> subsequent ParDo >>>>>>>>> >>> maps those to concrete elements (e.g. via an in-memory map), >>>>>>>>> but that >>>>>>>>> >>> further step doesn't buy much...) >>>>>>>>> >>> >>>>>>>>> >>> Only runners that want to do inlining would have to take on the >>>>>>>>> >>> complexity of a fully generic solution. >>>>>>>>> >> I think that if the simplification brings something, we can do >>>>>>>>> that, but >>>>>>>>> >> I'd like to understand why we cannot (or should not) use the >>>>>>>>> generic >>>>>>>>> >> solution. I think it definitely *should* be possible to use a >>>>>>>>> generic >>>>>>>>> >> solution, because otherwise the solution would not be generic. >>>>>>>>> And it >>>>>>>>> >> would imply, that we are unable to do generic transform >>>>>>>>> inlining, which >>>>>>>>> >> I would find really strange. That would immediately mean, that >>>>>>>>> we are >>>>>>>>> >> unable to construct classical runner as a special case of the >>>>>>>>> portable >>>>>>>>> >> one, which would be bad I think. >>>>>>>>> >> >>>>>>>>> >> The elements in the TestStreamPayload are encoded with pure >>>>>>>>> SDK-coder, >>>>>>>>> >> or does this go through the LengthPrefixUnknownCoders logic? If >>>>>>>>> not, >>>>>>>>> >> then the problem would be there, because that means, that the >>>>>>>>> SDK-coder >>>>>>>>> >> cannot be (implicitly) defined in the runner. If the elements >>>>>>>>> would be >>>>>>>>> >> encoded using LP, then it would be possible to decode them using >>>>>>>>> >> runner-coder and the problem should be solved, or am I still >>>>>>>>> missing >>>>>>>>> >> some key parts? >>>>>>>>> > Yes, the problem is precisely that there are (unspecified) >>>>>>>>> constraints >>>>>>>>> > on the coder used by the TestStreamPayload. Just requiring that >>>>>>>>> it be >>>>>>>>> > length prefixed is not enough, you have to make constraints on >>>>>>>>> > sometimes pushing down the length prefixing if it's a composite >>>>>>>>> (like >>>>>>>>> > a KV) that depend on what the runner is expected to support in >>>>>>>>> terms >>>>>>>>> > of composites and/or the choices it chooses for the channel (and >>>>>>>>> the >>>>>>>>> > runner, not knowing the coder, can't transcode between these >>>>>>>>> choices). >>>>>>>>> > >>>>>>>>> > The simpler solution is to constrain this coder to just be byte[] >>>>>>>>> > rather than let it be a little bit flexible (but not wholly >>>>>>>>> flexible). >>>>>>>>> > >>>>>>>>> > As for a fully generic solution, I think the issue encountered >>>>>>>>> with >>>>>>>>> > inlining Read vs. TestStream are related to this, but not really >>>>>>>>> the >>>>>>>>> > same. With TestStream one has an encoded representation of the >>>>>>>>> > elements provided by the SDK that the Runner and has no SDK >>>>>>>>> > representation/execution whereas with the Reads one has unencoded >>>>>>>>> > elements in hand and a Coder that is understood by both (so long >>>>>>>>> as >>>>>>>>> > the channel can be negotiated correctly). FWIW, I think the >>>>>>>>> proper >>>>>>>>> > solution to inlining a Read (or other Transform that would >>>>>>>>> typically >>>>>>>>> > be executed in the SDK) is to treat it as a special environment >>>>>>>>> (where >>>>>>>>> > we know logically it can work) and then elide, as possible, the >>>>>>>>> > various encodings, grpc calls, etc. that are unneeded as >>>>>>>>> everything is >>>>>>>>> > in process. >>>>>>>>> > >>>>>>>>> >>>> On 9/3/21 7:03 PM, Robert Bradshaw wrote: >>>>>>>>> >>>>> On Fri, Sep 3, 2021 at 2:40 AM Jan Lukavský<[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>> On 9/3/21 1:06 AM, Robert Bradshaw wrote: >>>>>>>>> >>>>>>> On Thu, Sep 2, 2021 at 1:03 AM Jan Lukavský< >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>> Hi, >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> I had some more time thinking about this and I'll try to >>>>>>>>> recap that. >>>>>>>>> >>>>>>>> First some invariants: >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> a) each PCollection<T> has actually two coders - an >>>>>>>>> _SDK coder_ and a >>>>>>>>> >>>>>>>> _runner coder_. These coders have the property, that each >>>>>>>>> one can >>>>>>>>> >>>>>>>> _decode_ what the other encoded, but the opposite is not >>>>>>>>> true, the >>>>>>>>> >>>>>>>> coders cannot _encode_ what the other _decoded_ (in >>>>>>>>> general). >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> b) when is a PCollection<T> computed inside an >>>>>>>>> environment, the >>>>>>>>> >>>>>>>> elements are encoded using SDK coder on the side of >>>>>>>>> SDK-harness and >>>>>>>>> >>>>>>>> decoded using runner coder after receiving in the runner >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> c) under specific circumstances, the encode-decode >>>>>>>>> step can be >>>>>>>>> >>>>>>>> optimized out, that is the case where the SDK coder and >>>>>>>>> all its >>>>>>>>> >>>>>>>> subcoders are all well-known to the runner (in the >>>>>>>>> present, that means >>>>>>>>> >>>>>>>> that all the parts present in the model coders set). The >>>>>>>>> reason for that >>>>>>>>> >>>>>>>> is that in this specific situation >>>>>>>>> runner_decode(sdk_encode(X)) = X. >>>>>>>>> >>>>>>>> This property is essential. >>>>>>>>> >>>>>>> However, in general, X can only pass from the SDK to the >>>>>>>>> runner (or >>>>>>>>> >>>>>>> vice versa) in encoded form. >>>>>>>>> >>>>>> In general yes, but we are (mostly) talking transform >>>>>>>>> inlining here, so >>>>>>>>> >>>>>> it that particular situation, the elements might be passed >>>>>>>>> in decoded form. >>>>>>>>> >>>>>>>> d) from b) immediately follows, that when a >>>>>>>>> PTransform does not run in >>>>>>>>> >>>>>>>> an environment (and this might be due to the transform >>>>>>>>> being runner >>>>>>>>> >>>>>>>> native, inlined, source (e.g. Impulse or TestStream)) the >>>>>>>>> elements have >>>>>>>>> >>>>>>>> to be encoded by SDK coder, immediately following decode >>>>>>>>> by runner >>>>>>>>> >>>>>>>> coder. That (surprisingly) applies even to situations >>>>>>>>> when runner is >>>>>>>>> >>>>>>>> implemented using different language than the client SDK, >>>>>>>>> because it >>>>>>>>> >>>>>>>> implies that the type of produced elements must be one of >>>>>>>>> types encoded >>>>>>>>> >>>>>>>> using model coders (well-known to the runner, otherwise >>>>>>>>> the SDK will not >>>>>>>>> >>>>>>>> be able to consume it). But - due to property c) - this >>>>>>>>> means that this >>>>>>>>> >>>>>>>> encode-decode step can be optimized out. This does not >>>>>>>>> mean that it is >>>>>>>>> >>>>>>>> not (logically) present, though. This is exactly the case >>>>>>>>> of native >>>>>>>>> >>>>>>>> Impulse transform. >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> Now, from that we can conclude that on the boundary >>>>>>>>> between executable >>>>>>>>> >>>>>>>> stages, or between runner (inlined) transform and >>>>>>>>> executable stage, each >>>>>>>>> >>>>>>>> PCollection has to be encoded using SDK coder and >>>>>>>>> immediately decoded by >>>>>>>>> >>>>>>>> runner coder, *unless this can be optimized out* by >>>>>>>>> property c). >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> This gives us two options where to implement this >>>>>>>>> encode/decode step: >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> 1) completely inside runner with the possibility to >>>>>>>>> optimize the >>>>>>>>> >>>>>>>> encode/decode step by identity under right circumstances >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> 2) partly in the runner and partly in the SDK - >>>>>>>>> that is we encode >>>>>>>>> >>>>>>>> elements of PCollection using SDK coder into bytes, pass >>>>>>>>> those to the >>>>>>>>> >>>>>>>> SDK harness and apply a custom decode step there. This >>>>>>>>> works because SDK >>>>>>>>> >>>>>>>> coder encoded elements are in byte[], and that is >>>>>>>>> well-known coder type. >>>>>>>>> >>>>>>>> We again only leverage property c) and optimize the SDK >>>>>>>>> coder encode, >>>>>>>>> >>>>>>>> runner decode step out. >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> The option 2) is exactly the proposal of TestStream >>>>>>>>> producing byte[] and >>>>>>>>> >>>>>>>> decoding inside SDK-harness, the TestStream is actually >>>>>>>>> inlined >>>>>>>>> >>>>>>>> transform, the elements are produced directly in runner >>>>>>>>> (the SDK coder >>>>>>>>> >>>>>>>> is not known to the runner, but that does not matter, >>>>>>>>> because the >>>>>>>>> >>>>>>>> elements are already encoded by client). >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> From the above it seems to me, that option 1) should >>>>>>>>> be preferred, because: >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> i) it is generic, applicable to all inlined >>>>>>>>> transforms, any sources >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> ii) it is consistent with how things logically work >>>>>>>>> underneath >>>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>> iii) it offers better room for optimization - >>>>>>>>> option 2) might result >>>>>>>>> >>>>>>>> in cases when the elements are passed from the runner to >>>>>>>>> the SDK-harness >>>>>>>>> >>>>>>>> only for the sake of the decoding from SDK coder and >>>>>>>>> immediately >>>>>>>>> >>>>>>>> encoding back using SDK-coder and returned back to the >>>>>>>>> runner. This >>>>>>>>> >>>>>>>> would be the case when TestStream would be directly >>>>>>>>> consumed by inlined >>>>>>>>> >>>>>>>> (or external) transform. >>>>>>>>> >>>>>>> (1) is not possible if the Coder in question is not known >>>>>>>>> to the >>>>>>>>> >>>>>>> Runner, which is why I proposed (2). >>>>>>>>> >>>>>> There is no particular need for the coder to be known. If >>>>>>>>> transform is >>>>>>>>> >>>>>> to be inlined, what *has* to be known is the SDK-encoded >>>>>>>>> form of the >>>>>>>>> >>>>>> elements. That holds true if: >>>>>>>>> >>>>>> >>>>>>>>> >>>>>> a) either the SDK coder is known, or >>>>>>>>> >>>>>> >>>>>>>>> >>>>>> b) encoded form of the produced elements is known in >>>>>>>>> advance >>>>>>>>> >>>>>> >>>>>>>>> >>>>>> For TestStream it is the case b). For inlined primitive >>>>>>>>> Read (or any >>>>>>>>> >>>>>> other transform which executes code) it is a). >>>>>>>>> >>>>> There's another hitch here for TestStream. For historical >>>>>>>>> reasons, >>>>>>>>> >>>>> coders actually represent two encodings: nested (aka self >>>>>>>>> delimiting) >>>>>>>>> >>>>> and unnested. TestStream elements are given as unnested >>>>>>>>> encoded bytes, >>>>>>>>> >>>>> but the nested encoding is required for sending data to the >>>>>>>>> SDK. The >>>>>>>>> >>>>> runner can't go from <nested encoding> to <unnested >>>>>>>>> encoding> for an >>>>>>>>> >>>>> arbitrary unknown coder. >>>>>>>>> >>>>> >>>>>>>>> >>>>> (Even if it weren't for this complication, to be able to >>>>>>>>> send already >>>>>>>>> >>>>> encoded bytes of an unknown coder to the SDK will also >>>>>>>>> complicate the >>>>>>>>> >>>>> logic in choosing the coder to be used for the channel and >>>>>>>>> sending the >>>>>>>>> >>>>> data, which is some of what you're running into (but can be >>>>>>>>> solved >>>>>>>>> >>>>> differently for inlined reads as the coder can always be >>>>>>>>> known by the >>>>>>>>> >>>>> runner).) >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> >>>>>>> Jack McCluskey >>>>>>> SWE - DataPLS PLAT/ Beam Go >>>>>>> RDU >>>>>>> [email protected] >>>>>>> >>>>>>> >>>>>>>
