On Fri, Sep 3, 2021 at 2:40 AM Jan Lukavský <[email protected]> wrote:
>
>
> On 9/3/21 1:06 AM, Robert Bradshaw wrote:
> > On Thu, Sep 2, 2021 at 1:03 AM Jan Lukavský <[email protected]> wrote:
> >> Hi,
> >>
> >> I had some more time thinking about this and I'll try to recap that.
> >> First some invariants:
> >>
> >>    a) each PCollection<T> has actually two coders - an _SDK coder_ and a
> >> _runner coder_. These coders have the property, that each one can
> >> _decode_ what the other encoded, but the opposite is not true, the
> >> coders cannot _encode_ what the other _decoded_ (in general).
> >>
> >>    b) when is a PCollection<T> computed inside an environment, the
> >> elements are encoded using SDK coder on the side of SDK-harness and
> >> decoded using runner coder after receiving in the runner
> >>
> >>    c) under specific circumstances, the encode-decode step can be
> >> optimized out, that is the case where the SDK coder and all its
> >> subcoders are all well-known to the runner (in the present, that means
> >> that all the parts present in the model coders set). The reason for that
> >> is that in this specific situation runner_decode(sdk_encode(X)) = X.
> >> This property is essential.
> > However, in general, X can only pass from the SDK to the runner (or
> > vice versa) in encoded form.
> In general yes, but we are (mostly) talking transform inlining here, so
> it that particular situation, the elements might be passed in decoded form.
> >
> >>    d) from b) immediately follows, that when a PTransform does not run in
> >> an environment (and this might be due to the transform being runner
> >> native, inlined, source (e.g. Impulse or TestStream)) the elements have
> >> to be encoded by SDK coder, immediately following decode by runner
> >> coder. That (surprisingly) applies even to situations when runner is
> >> implemented using different language than the client SDK, because it
> >> implies that the type of produced elements must be one of types encoded
> >> using model coders (well-known to the runner, otherwise the SDK will not
> >> be able to consume it). But - due to property c) - this means that this
> >> encode-decode step can be optimized out. This does not mean that it is
> >> not (logically) present, though. This is exactly the case of native
> >> Impulse transform.
> >>
> >> Now, from that we can conclude that on the boundary between executable
> >> stages, or between runner (inlined) transform and executable stage, each
> >> PCollection has to be encoded using SDK coder and immediately decoded by
> >> runner coder, *unless this can be optimized out* by property c).
> >>
> >> This gives us two options where to implement this encode/decode step:
> >>
> >>    1) completely inside runner with the possibility to optimize the
> >> encode/decode step by identity under right circumstances
> >>
> >>    2) partly in the runner and partly in the SDK - that is we encode
> >> elements of PCollection using SDK coder into bytes, pass those to the
> >> SDK harness and apply a custom decode step there. This works because SDK
> >> coder encoded elements are in byte[], and that is well-known coder type.
> >> We again only leverage property c) and optimize the SDK coder encode,
> >> runner decode step out.
> >>
> >> The option 2) is exactly the proposal of TestStream producing byte[] and
> >> decoding inside SDK-harness, the TestStream is actually inlined
> >> transform, the elements are produced directly in runner (the SDK coder
> >> is not known to the runner, but that does not matter, because the
> >> elements are already encoded by client).
> >>
> >>   From the above it seems to me, that option 1) should be preferred, 
> >> because:
> >>
> >>    i) it is generic, applicable to all inlined transforms, any sources
> >>
> >>    ii) it is consistent with how things logically work underneath
> >>
> >>    iii) it offers better room for optimization - option 2) might result
> >> in cases when the elements are passed from the runner to the SDK-harness
> >> only for the sake of the decoding from SDK coder and immediately
> >> encoding back using SDK-coder and returned back to the runner. This
> >> would be the case when TestStream would be directly consumed by inlined
> >> (or external) transform.
> > (1) is not possible if the Coder in question is not known to the
> > Runner, which is why I proposed (2).
>
> There is no particular need for the coder to be known. If transform is
> to be inlined, what *has* to be known is the SDK-encoded form of the
> elements. That holds true if:
>
>   a) either the SDK coder is known, or
>
>   b) encoded form of the produced elements is known in advance
>
> For TestStream it is the case b). For inlined primitive Read (or any
> other transform which executes code) it is a).

There's another hitch here for TestStream. For historical reasons,
coders actually represent two encodings: nested (aka self delimiting)
and unnested. TestStream elements are given as unnested encoded bytes,
but the nested encoding is required for sending data to the SDK. The
runner can't go from <nested encoding> to <unnested encoding> for an
arbitrary unknown coder.

(Even if it weren't for this complication, to be able to send already
encoded bytes of an unknown coder to the SDK will also complicate the
logic in choosing the coder to be used for the channel and sending the
data, which is some of what you're running into (but can be solved
differently for inlined reads as the coder can always be known by the
runner).)

Reply via email to