Reuven
On Mon, Sep 6, 2021 at 1:29 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
> Unfortunately the most basic coders (e.g.
bytes, string, kv, iterable)
> care about Context because they predated
this deprecation, and
> changing coders is hard (due to no way to
update the encoding for a
> streaming pipeline).
This is unrelated, but - regarding changing
coders due to concerns about
pipeline upgrades, we break this quite
often, at least for some runners.
Most recently [1].
> It is currently the latter for runners
using this code (which not all
> do, e.g. the ULR and Dataflow runners). I
don't think we want to
> ossify this decision as part of the spec.
(Note that even what's
> "known" and "unknown" can change from
runner to runner.)
This is interesting and unexpected for me.
How do runners decide about
how they encode elements between SDK harness
and the runner? How do they
inform the SDK harness about this decision?
My impression was that this
is well-defined at the model level. If not,
then we have the reason for
misunderstanding in this conversation. :-)
Jan
[1]
https://lists.apache.org/thread.html/r51ee0bbaba2dcef13524a189c1f579f209483418a1568acff0e2c789%40%3Cdev.beam.apache.org%3E
<https://lists.apache.org/thread.html/r51ee0bbaba2dcef13524a189c1f579f209483418a1568acff0e2c789%40%3Cdev.beam.apache.org%3E>
On 9/4/21 7:32 PM, Robert Bradshaw wrote:
> On Sat, Sep 4, 2021 at 6:52 AM Jan
Lukavský <[email protected]
<mailto:[email protected]>> wrote:
>> On 9/3/21 9:50 PM, Robert Bradshaw wrote:
>>
>>> On Fri, Sep 3, 2021 at 11:42 AM Jan
Lukavský<[email protected]
<mailto:[email protected]>> wrote:
>>>> Hi Robert,
>>>>
>>>>> There's another hitch here for
TestStream. For historical reasons,
>>>>> coders actually represent two
encodings: nested (aka self delimiting)
>>>>> and unnested. TestStream elements are
given as unnested encoded bytes,
>>>>> but the nested encoding is required
for sending data to the SDK. The
>>>>> runner can't go from <nested encoding>
to <unnested encoding> for an
>>>>> arbitrary unknown coder.
>>>>>
>>>>> (Even if it weren't for this
complication, to be able to send already
>>>>> encoded bytes of an unknown coder to
the SDK will also complicate the
>>>>> logic in choosing the coder to be used
for the channel and sending the
>>>>> data, which is some of what you're
running into (but can be solved
>>>>> differently for inlined reads as the
coder can always be known by the
>>>>> runner).)
>>>> It is hard for me to argue with
"historical reasons". But - the "nested"
>>>> and "unnested" coders look very similar
to SDK-coder and runner-coder
>>>> spaces.
>>> Unfortunately, they're actually
orthogonal to that.
>> Hm, do you mean the Context passed to the
encode/decode method? [1] That
>> seems to be deprecated, I assume that
most coders use the default
>> implementation and simply ignore the Context?
> Unfortunately the most basic coders (e.g.
bytes, string, kv, iterable)
> care about Context because they predated
this deprecation, and
> changing coders is hard (due to no way to
update the encoding for a
> streaming pipeline).
>
>> Even if not - whether or
>> not the elements are encoded using NESTED
Context or UNNESTED Context
>> should be part of the contract of
TestStream, right? Most likely it is
>> the UNNESTED one, if I understand
correctly what that does. Under what
>> conditions is the deprecated
encode/decode method used?
> Yes, it's the UNNESTED one.
>
>> [1]
>>
https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L134
<https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java#L134>
>>
>>>> The runner's responsibility is not to
go from "<nested
>>>> encoding>" (SDK coder) to "<unnested
encoding>" for arbitrary coder.
>>>> That is really impossible. But a coder
is a function, right? Function
>>>> maps from universe A to universe B (in
general). TestStream provides a
>>>> set of elements, and these elements are
the "universe". For those
>>>> elements it also provides the encoded
form, which can be interpreted as
>>>> the definition of the coder.
>>> The problem here is that there is not
"the encoded form" for a Coder
>>> but two encoded forms, and we have the
wrong one. Things could be made
>>> to work if we had the other.
>> Which two encoded forms do you refer to?
Elements encoded by both the
>> SDK-coder and runner-coder (and I ignore
the Context here once again)
>> have the same binary representation
(which they must have, otherwise it
>> would be impossible to decode elements
coming from the runner to the
>> SDK-harness or vice-versa).
>>>> Therefore - technically (and formally) -
>>>> the SDK coder for the TestStream is
known to the runner, regardless of
>>>> the language the runner is written in.
>>>>
>>>> To move this discussion forward, I
think we should look for answers to
>>>> the following questions:
>>>>
>>>> a) do we have any clues that show,
that the proposed "in runner"
>>>> solution will not work?
>>> OK, thinking about it some more, in the
TestStream, we can use the
>>> happy coincidence that
>>>
>>> LengthPrefixed(C).encode(x, nested=True) ==
>>> VarLong.encode(len(C.encode(x,
nested=False))) || C.encode(x,
>>> nested=False)
>>>
>>> (where || denotes concatenation) and the
fact that we have
>>>
>>> C.encode(x, nested=False)
>>>
>>> in hand.
>>>
>>> A possible fix here for the OP's
question is that when rehydrating the
>>> TestStream transform it must behave
differently according to the coder
>>> used in the subsequent channel (e.g. for
known coders, it decodes the
>>> elements and emits them directly, but
for unknown coders, it prefixes
>>> them with their length and emits byte
strings. It gets more
>>> complicated for nested coders, e.g. for
a KV<known-coder,
>>> unknown-coder> the channel might be
LP(KV<known-coder, unknown-coder))
>>> or KV<known-coder, LP(unknown-coder))
which have different encodings
>>> (and the latter, which is the default,
requires transcoding the bytes
>>> to inject the length in the middle which
is found by decoding the
>>> first component). As well as getting
more complex, this really seems
>>> to violate the spirit of separation of
concerns.
>> How do we make the decision if the
channel is LP<KV<..>> or
>> KV<LP<unknown>, known>? From my
understanding it is always the latter,
>> because of [2].
>>
>> [2]
>>
https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48
<https://github.com/apache/beam/blob/c4e0b4ac0777f37f5eb775a8a83c56f66b3baac3/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48>
> It is currently the latter for runners
using this code (which not all
> do, e.g. the ULR and Dataflow runners). I
don't think we want to
> ossify this decision as part of the spec.
(Note that even what's
> "known" and "unknown" can change from
runner to runner.)
>
>>>> b) do we think, that it will not be
robust enough to incorporate the
>>>> other use-cases (line generic transform
inlining, taking into account
>>>> that this applies only to runners that
are written in the same language
>>>> as the submitting SDK, because
otherwise, there is nothing to inline)?
>>> Being in the same language is not a
prerequisite to "inlining," e.g.
>>> the PubSub source on Dataflow is
recognized as such and not executed
>>> as SDK code but natively.
>> Agree, that is actually exactly what
happens with the TestStream. The
>> transform need not be in the same
language, as long as it is completely
>> understood by the runner, including the
SDK-coder (either explicitly -
>> which might be due to the PCollection
coder being composed of well-known
>> coders only, or implicitly like in the
case of TestStream, where the
>> elements are encoded using the SDK coder.
>>> It is more likely that inlining occurs
in the same language if there
>>> are UDFs involved.
>>>
>>>> I'm convinced, that the
TestStream-decode expansion solution is an
>>>> ad-hoc solution to a generic problem,
which is why I'm still bothering
>>>> this mailing list with my emails on
this. :-)
>>>>
>>>> WDYT?
>>> While not a solution to the general
problem, I think the
>>> TestStream-only-does-bytes simplifies
its definition (primitives
>>> should have as simple/easy to implement
definitions as possible) and
>>> brings it closer to the other root we
have: Impulse. (We could go a
>>> step further and rather than emitting
encoded elements, with the data
>>> in the proto itself, it emits sequence
numbers, and a subsequent ParDo
>>> maps those to concrete elements (e.g.
via an in-memory map), but that
>>> further step doesn't buy much...)
>>>
>>> Only runners that want to do inlining
would have to take on the
>>> complexity of a fully generic solution.
>> I think that if the simplification brings
something, we can do that, but
>> I'd like to understand why we cannot (or
should not) use the generic
>> solution. I think it definitely *should*
be possible to use a generic
>> solution, because otherwise the solution
would not be generic. And it
>> would imply, that we are unable to do
generic transform inlining, which
>> I would find really strange. That would
immediately mean, that we are
>> unable to construct classical runner as a
special case of the portable
>> one, which would be bad I think.
>>
>> The elements in the TestStreamPayload are
encoded with pure SDK-coder,
>> or does this go through the
LengthPrefixUnknownCoders logic? If not,
>> then the problem would be there, because
that means, that the SDK-coder
>> cannot be (implicitly) defined in the
runner. If the elements would be
>> encoded using LP, then it would be
possible to decode them using
>> runner-coder and the problem should be
solved, or am I still missing
>> some key parts?
> Yes, the problem is precisely that there
are (unspecified) constraints
> on the coder used by the
TestStreamPayload. Just requiring that it be
> length prefixed is not enough, you have to
make constraints on
> sometimes pushing down the length
prefixing if it's a composite (like
> a KV) that depend on what the runner is
expected to support in terms
> of composites and/or the choices it
chooses for the channel (and the
> runner, not knowing the coder, can't
transcode between these choices).
>
> The simpler solution is to constrain this
coder to just be byte[]
> rather than let it be a little bit
flexible (but not wholly flexible).
>
> As for a fully generic solution, I think
the issue encountered with
> inlining Read vs. TestStream are related
to this, but not really the
> same. With TestStream one has an encoded
representation of the
> elements provided by the SDK that the
Runner and has no SDK
> representation/execution whereas with the
Reads one has unencoded
> elements in hand and a Coder that is
understood by both (so long as
> the channel can be negotiated correctly).
FWIW, I think the proper
> solution to inlining a Read (or other
Transform that would typically
> be executed in the SDK) is to treat it as
a special environment (where
> we know logically it can work) and then
elide, as possible, the
> various encodings, grpc calls, etc. that
are unneeded as everything is
> in process.
>
>>>> On 9/3/21 7:03 PM, Robert Bradshaw wrote:
>>>>> On Fri, Sep 3, 2021 at 2:40 AM Jan
Lukavský<[email protected]
<mailto:[email protected]>> wrote:
>>>>>> On 9/3/21 1:06 AM, Robert Bradshaw wrote:
>>>>>>> On Thu, Sep 2, 2021 at 1:03 AM Jan
Lukavský<[email protected]
<mailto:[email protected]>> wrote:
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I had some more time thinking about
this and I'll try to recap that.
>>>>>>>> First some invariants:
>>>>>>>>
>>>>>>>> a) each PCollection<T> has
actually two coders - an _SDK coder_ and a
>>>>>>>> _runner coder_. These coders have
the property, that each one can
>>>>>>>> _decode_ what the other encoded,
but the opposite is not true, the
>>>>>>>> coders cannot _encode_ what the
other _decoded_ (in general).
>>>>>>>>
>>>>>>>> b) when is a PCollection<T>
computed inside an environment, the
>>>>>>>> elements are encoded using SDK
coder on the side of SDK-harness and
>>>>>>>> decoded using runner coder after
receiving in the runner
>>>>>>>>
>>>>>>>> c) under specific
circumstances, the encode-decode step can be
>>>>>>>> optimized out, that is the case
where the SDK coder and all its
>>>>>>>> subcoders are all well-known to the
runner (in the present, that means
>>>>>>>> that all the parts present in the
model coders set). The reason for that
>>>>>>>> is that in this specific situation
runner_decode(sdk_encode(X)) = X.
>>>>>>>> This property is essential.
>>>>>>> However, in general, X can only pass
from the SDK to the runner (or
>>>>>>> vice versa) in encoded form.
>>>>>> In general yes, but we are (mostly)
talking transform inlining here, so
>>>>>> it that particular situation, the
elements might be passed in decoded form.
>>>>>>>> d) from b) immediately
follows, that when a PTransform does not run in
>>>>>>>> an environment (and this might be
due to the transform being runner
>>>>>>>> native, inlined, source (e.g.
Impulse or TestStream)) the elements have
>>>>>>>> to be encoded by SDK coder,
immediately following decode by runner
>>>>>>>> coder. That (surprisingly) applies
even to situations when runner is
>>>>>>>> implemented using different
language than the client SDK, because it
>>>>>>>> implies that the type of produced
elements must be one of types encoded
>>>>>>>> using model coders (well-known to
the runner, otherwise the SDK will not
>>>>>>>> be able to consume it). But - due
to property c) - this means that this
>>>>>>>> encode-decode step can be optimized
out. This does not mean that it is
>>>>>>>> not (logically) present, though.
This is exactly the case of native
>>>>>>>> Impulse transform.
>>>>>>>>
>>>>>>>> Now, from that we can conclude that
on the boundary between executable
>>>>>>>> stages, or between runner (inlined)
transform and executable stage, each
>>>>>>>> PCollection has to be encoded using
SDK coder and immediately decoded by
>>>>>>>> runner coder, *unless this can be
optimized out* by property c).
>>>>>>>>
>>>>>>>> This gives us two options where to
implement this encode/decode step:
>>>>>>>>
>>>>>>>> 1) completely inside runner
with the possibility to optimize the
>>>>>>>> encode/decode step by identity
under right circumstances
>>>>>>>>
>>>>>>>> 2) partly in the runner and
partly in the SDK - that is we encode
>>>>>>>> elements of PCollection using SDK
coder into bytes, pass those to the
>>>>>>>> SDK harness and apply a custom
decode step there. This works because SDK
>>>>>>>> coder encoded elements are in
byte[], and that is well-known coder type.
>>>>>>>> We again only leverage property c)
and optimize the SDK coder encode,
>>>>>>>> runner decode step out.
>>>>>>>>
>>>>>>>> The option 2) is exactly the
proposal of TestStream producing byte[] and
>>>>>>>> decoding inside SDK-harness, the
TestStream is actually inlined
>>>>>>>> transform, the elements are
produced directly in runner (the SDK coder
>>>>>>>> is not known to the runner, but
that does not matter, because the
>>>>>>>> elements are already encoded by
client).
>>>>>>>>
>>>>>>>> From the above it seems to me,
that option 1) should be preferred, because:
>>>>>>>>
>>>>>>>> i) it is generic, applicable
to all inlined transforms, any sources
>>>>>>>>
>>>>>>>> ii) it is consistent with how
things logically work underneath
>>>>>>>>
>>>>>>>> iii) it offers better room
for optimization - option 2) might result
>>>>>>>> in cases when the elements are
passed from the runner to the SDK-harness
>>>>>>>> only for the sake of the decoding
from SDK coder and immediately
>>>>>>>> encoding back using SDK-coder and
returned back to the runner. This
>>>>>>>> would be the case when TestStream
would be directly consumed by inlined
>>>>>>>> (or external) transform.
>>>>>>> (1) is not possible if the Coder in
question is not known to the
>>>>>>> Runner, which is why I proposed (2).
>>>>>> There is no particular need for the
coder to be known. If transform is
>>>>>> to be inlined, what *has* to be known
is the SDK-encoded form of the
>>>>>> elements. That holds true if:
>>>>>>
>>>>>> a) either the SDK coder is known, or
>>>>>>
>>>>>> b) encoded form of the produced
elements is known in advance
>>>>>>
>>>>>> For TestStream it is the case b). For
inlined primitive Read (or any
>>>>>> other transform which executes code)
it is a).
>>>>> There's another hitch here for
TestStream. For historical reasons,
>>>>> coders actually represent two
encodings: nested (aka self delimiting)
>>>>> and unnested. TestStream elements are
given as unnested encoded bytes,
>>>>> but the nested encoding is required
for sending data to the SDK. The
>>>>> runner can't go from <nested encoding>
to <unnested encoding> for an
>>>>> arbitrary unknown coder.
>>>>>
>>>>> (Even if it weren't for this
complication, to be able to send already
>>>>> encoded bytes of an unknown coder to
the SDK will also complicate the
>>>>> logic in choosing the coder to be used
for the channel and sending the
>>>>> data, which is some of what you're
running into (but can be solved
>>>>> differently for inlined reads as the
coder can always be known by the
>>>>> runner).)