Unfortunately the most basic coders (e.g. bytes, string, kv, iterable)
care about Context because they predated this deprecation, and
changing coders is hard (due to no way to update the encoding for a
streaming pipeline).
This is unrelated, but - regarding changing coders due to concerns about pipeline upgrades, we break this quite often, at least for some runners. Most recently [1].

It is currently the latter for runners using this code (which not all
do, e.g. the ULR and Dataflow runners). I don't think we want to
ossify this decision as part of the spec. (Note that even what's
"known" and "unknown" can change from runner to runner.)
This is interesting and unexpected for me. How do runners decide about how they encode elements between SDK harness and the runner? How do they inform the SDK harness about this decision? My impression was that this is well-defined at the model level. If not, then we have the reason for misunderstanding in this conversation. :-)

 Jan

[1] https://lists.apache.org/thread.html/r51ee0bbaba2dcef13524a189c1f579f209483418a1568acff0e2c789%40%3Cdev.beam.apache.org%3E

On 9/4/21 7:32 PM, Robert Bradshaw wrote:
On Sat, Sep 4, 2021 at 6:52 AM Jan Lukavský <[email protected]> wrote:
On 9/3/21 9:50 PM, Robert Bradshaw wrote:

On Fri, Sep 3, 2021 at 11:42 AM Jan Lukavský<[email protected]>  wrote:
Hi Robert,

There's another hitch here for TestStream. For historical reasons,
coders actually represent two encodings: nested (aka self delimiting)
and unnested. TestStream elements are given as unnested encoded bytes,
but the nested encoding is required for sending data to the SDK. The
runner can't go from <nested encoding> to <unnested encoding> for an
arbitrary unknown coder.

(Even if it weren't for this complication, to be able to send already
encoded bytes of an unknown coder to the SDK will also complicate the
logic in choosing the coder to be used for the channel and sending the
data, which is some of what you're running into (but can be solved
differently for inlined reads as the coder can always be known by the
runner).)
It is hard for me to argue with "historical reasons". But - the "nested"
and "unnested" coders look very similar to SDK-coder and runner-coder
spaces.
Unfortunately, they're actually orthogonal to that.
Hm, do you mean the Context passed to the encode/decode method? [1] That
seems to be deprecated, I assume that most coders use the default
implementation and simply ignore the Context?
Unfortunately the most basic coders (e.g. bytes, string, kv, iterable)
care about Context because they predated this deprecation, and
changing coders is hard (due to no way to update the encoding for a
streaming pipeline).

Even if not - whether or
not the elements are encoded using NESTED Context or UNNESTED Context
should be part of the contract of TestStream, right? Most likely it is
the UNNESTED one, if I understand correctly what that does. Under what
conditions is the deprecated encode/decode method used?
Yes, it's the UNNESTED one.

[1]
https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L134

The runner's responsibility is not to go from "<nested
encoding>" (SDK coder) to "<unnested encoding>" for arbitrary coder.
That is really impossible. But a coder is a function, right? Function
maps from universe A to universe B (in general). TestStream provides a
set of elements, and these elements are the "universe". For those
elements it also provides the encoded form, which can be interpreted as
the definition of the coder.
The problem here is that there is not "the encoded form" for a Coder
but two encoded forms, and we have the wrong one. Things could be made
to work if we had the other.
Which two encoded forms do you refer to? Elements encoded by both the
SDK-coder and runner-coder (and I ignore the Context here once again)
have the same binary representation (which they must have, otherwise it
would be impossible to decode elements coming from the runner to the
SDK-harness or vice-versa).
Therefore - technically (and formally) -
the SDK coder for the TestStream is known to the runner, regardless of
the language the runner is written in.

To move  this discussion forward, I think we should look for answers to
the following questions:

    a) do we have any clues that show, that the proposed "in runner"
solution will not work?
OK, thinking about it some more, in the TestStream, we can use the
happy coincidence that

      LengthPrefixed(C).encode(x, nested=True) ==
VarLong.encode(len(C.encode(x, nested=False))) || C.encode(x,
nested=False)

(where || denotes concatenation) and the fact that we have

      C.encode(x, nested=False)

in hand.

A possible fix here for the OP's question is that when rehydrating the
TestStream transform it must behave differently according to the coder
used in the subsequent channel (e.g. for known coders, it decodes the
elements and emits them directly, but for unknown coders, it prefixes
them with their length and emits byte strings. It gets more
complicated for nested coders, e.g. for a KV<known-coder,
unknown-coder> the channel might be LP(KV<known-coder, unknown-coder))
or KV<known-coder, LP(unknown-coder)) which have different encodings
(and the latter, which is the default, requires transcoding the bytes
to inject the length in the middle which is found by decoding the
first component). As well as getting more complex, this really seems
to violate the spirit of separation of concerns.
How do we make the decision if the channel is LP<KV<..>> or
KV<LP<unknown>, known>? From my understanding it is always the latter,
because of [2].

[2]
https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48
It is currently the latter for runners using this code (which not all
do, e.g. the ULR and Dataflow runners). I don't think we want to
ossify this decision as part of the spec. (Note that even what's
"known" and "unknown" can change from runner to runner.)

    b) do we think, that it will not be robust enough to incorporate the
other use-cases (line generic transform inlining, taking into account
that this applies only to runners that are written in the same language
as the submitting SDK, because otherwise, there is nothing to inline)?
Being in the same language is not a prerequisite to "inlining," e.g.
the PubSub source on Dataflow is recognized as such and not executed
as SDK code but natively.
Agree, that is actually exactly what happens with the TestStream. The
transform need not be in the same language, as long as it is completely
understood by the runner, including the SDK-coder (either explicitly -
which might be due to the PCollection coder being composed of well-known
coders only, or implicitly like in the case of TestStream, where the
elements are encoded using the SDK coder.
It is more likely that inlining occurs in the same language if there
are UDFs involved.

I'm convinced, that the TestStream-decode expansion solution is an
ad-hoc solution to a generic problem, which is why I'm still bothering
this mailing list with my emails on this. :-)

WDYT?
While not a solution to the general problem, I think the
TestStream-only-does-bytes simplifies its definition (primitives
should have as simple/easy to implement definitions as possible) and
brings it closer to the other root we have: Impulse. (We could go a
step further and rather than emitting encoded elements, with the data
in the proto itself, it emits sequence numbers, and a subsequent ParDo
maps those to concrete elements (e.g. via an in-memory map), but that
further step doesn't buy much...)

Only runners that want to do inlining would have to take on the
complexity of a fully generic solution.
I think that if the simplification brings something, we can do that, but
I'd like to understand why we cannot (or should not) use the generic
solution. I think it definitely *should* be possible to use a generic
solution, because otherwise the solution would not be generic. And it
would imply, that we are unable to do generic transform inlining, which
I would find really strange. That would immediately mean, that we are
unable to construct classical runner as a special case of the portable
one, which would be bad I think.

The elements in the TestStreamPayload are encoded with pure SDK-coder,
or does this go through the LengthPrefixUnknownCoders logic? If not,
then the problem would be there, because that means, that the SDK-coder
cannot be (implicitly) defined in the runner. If the elements would be
encoded using LP, then it would be possible to decode them using
runner-coder and the problem should be solved, or am I still missing
some key parts?
Yes, the problem is precisely that there are (unspecified) constraints
on the coder used by the TestStreamPayload. Just requiring that it be
length prefixed is not enough, you have to make constraints on
sometimes pushing down the length prefixing if it's a composite (like
a KV) that depend on what the runner is expected to support in terms
of composites and/or the choices it chooses for the channel (and the
runner, not knowing the coder, can't transcode between these choices).

The simpler solution is to constrain this coder to just be byte[]
rather than let it be a little bit flexible (but not wholly flexible).

As for a fully generic solution, I think the issue encountered with
inlining Read vs. TestStream are related to this, but not really the
same. With TestStream one has an encoded representation of the
elements provided by the SDK that the Runner and has no SDK
representation/execution whereas with the Reads one has unencoded
elements in hand and a Coder that is understood by both (so long as
the channel can be negotiated correctly). FWIW, I think the proper
solution to inlining a Read (or other Transform that would typically
be executed in the SDK) is to treat it as a special environment (where
we know logically it can work) and then elide, as possible, the
various encodings, grpc calls, etc. that are unneeded as everything is
in process.

On 9/3/21 7:03 PM, Robert Bradshaw wrote:
On Fri, Sep 3, 2021 at 2:40 AM Jan Lukavský<[email protected]>  wrote:
On 9/3/21 1:06 AM, Robert Bradshaw wrote:
On Thu, Sep 2, 2021 at 1:03 AM Jan Lukavský<[email protected]>  wrote:
Hi,

I had some more time thinking about this and I'll try to recap that.
First some invariants:

      a) each PCollection<T> has actually two coders - an _SDK coder_ and a
_runner coder_. These coders have the property, that each one can
_decode_ what the other encoded, but the opposite is not true, the
coders cannot _encode_ what the other _decoded_ (in general).

      b) when is a PCollection<T> computed inside an environment, the
elements are encoded using SDK coder on the side of SDK-harness and
decoded using runner coder after receiving in the runner

      c) under specific circumstances, the encode-decode step can be
optimized out, that is the case where the SDK coder and all its
subcoders are all well-known to the runner (in the present, that means
that all the parts present in the model coders set). The reason for that
is that in this specific situation runner_decode(sdk_encode(X)) = X.
This property is essential.
However, in general, X can only pass from the SDK to the runner (or
vice versa) in encoded form.
In general yes, but we are (mostly) talking transform inlining here, so
it that particular situation, the elements might be passed in decoded form.
      d) from b) immediately follows, that when a PTransform does not run in
an environment (and this might be due to the transform being runner
native, inlined, source (e.g. Impulse or TestStream)) the elements have
to be encoded by SDK coder, immediately following decode by runner
coder. That (surprisingly) applies even to situations when runner is
implemented using different language than the client SDK, because it
implies that the type of produced elements must be one of types encoded
using model coders (well-known to the runner, otherwise the SDK will not
be able to consume it). But - due to property c) - this means that this
encode-decode step can be optimized out. This does not mean that it is
not (logically) present, though. This is exactly the case of native
Impulse transform.

Now, from that we can conclude that on the boundary between executable
stages, or between runner (inlined) transform and executable stage, each
PCollection has to be encoded using SDK coder and immediately decoded by
runner coder, *unless this can be optimized out* by property c).

This gives us two options where to implement this encode/decode step:

      1) completely inside runner with the possibility to optimize the
encode/decode step by identity under right circumstances

      2) partly in the runner and partly in the SDK - that is we encode
elements of PCollection using SDK coder into bytes, pass those to the
SDK harness and apply a custom decode step there. This works because SDK
coder encoded elements are in byte[], and that is well-known coder type.
We again only leverage property c) and optimize the SDK coder encode,
runner decode step out.

The option 2) is exactly the proposal of TestStream producing byte[] and
decoding inside SDK-harness, the TestStream is actually inlined
transform, the elements are produced directly in runner (the SDK coder
is not known to the runner, but that does not matter, because the
elements are already encoded by client).

     From the above it seems to me, that option 1) should be preferred, because:

      i) it is generic, applicable to all inlined transforms, any sources

      ii) it is consistent with how things logically work underneath

      iii) it offers better room for optimization - option 2) might result
in cases when the elements are passed from the runner to the SDK-harness
only for the sake of the decoding from SDK coder and immediately
encoding back using SDK-coder and returned back to the runner. This
would be the case when TestStream would be directly consumed by inlined
(or external) transform.
(1) is not possible if the Coder in question is not known to the
Runner, which is why I proposed (2).
There is no particular need for the coder to be known. If transform is
to be inlined, what *has* to be known is the SDK-encoded form of the
elements. That holds true if:

     a) either the SDK coder is known, or

     b) encoded form of the produced elements is known in advance

For TestStream it is the case b). For inlined primitive Read (or any
other transform which executes code) it is a).
There's another hitch here for TestStream. For historical reasons,
coders actually represent two encodings: nested (aka self delimiting)
and unnested. TestStream elements are given as unnested encoded bytes,
but the nested encoding is required for sending data to the SDK. The
runner can't go from <nested encoding> to <unnested encoding> for an
arbitrary unknown coder.

(Even if it weren't for this complication, to be able to send already
encoded bytes of an unknown coder to the SDK will also complicate the
logic in choosing the coder to be used for the channel and sending the
data, which is some of what you're running into (but can be solved
differently for inlined reads as the coder can always be known by the
runner).)

Reply via email to