On 9/3/21 1:06 AM, Robert Bradshaw wrote:
On Thu, Sep 2, 2021 at 1:03 AM Jan Lukavský <[email protected]> wrote:
Hi,

I had some more time thinking about this and I'll try to recap that.
First some invariants:

   a) each PCollection<T> has actually two coders - an _SDK coder_ and a
_runner coder_. These coders have the property, that each one can
_decode_ what the other encoded, but the opposite is not true, the
coders cannot _encode_ what the other _decoded_ (in general).

   b) when is a PCollection<T> computed inside an environment, the
elements are encoded using SDK coder on the side of SDK-harness and
decoded using runner coder after receiving in the runner

   c) under specific circumstances, the encode-decode step can be
optimized out, that is the case where the SDK coder and all its
subcoders are all well-known to the runner (in the present, that means
that all the parts present in the model coders set). The reason for that
is that in this specific situation runner_decode(sdk_encode(X)) = X.
This property is essential.
However, in general, X can only pass from the SDK to the runner (or
vice versa) in encoded form.
In general yes, but we are (mostly) talking transform inlining here, so it that particular situation, the elements might be passed in decoded form.

   d) from b) immediately follows, that when a PTransform does not run in
an environment (and this might be due to the transform being runner
native, inlined, source (e.g. Impulse or TestStream)) the elements have
to be encoded by SDK coder, immediately following decode by runner
coder. That (surprisingly) applies even to situations when runner is
implemented using different language than the client SDK, because it
implies that the type of produced elements must be one of types encoded
using model coders (well-known to the runner, otherwise the SDK will not
be able to consume it). But - due to property c) - this means that this
encode-decode step can be optimized out. This does not mean that it is
not (logically) present, though. This is exactly the case of native
Impulse transform.

Now, from that we can conclude that on the boundary between executable
stages, or between runner (inlined) transform and executable stage, each
PCollection has to be encoded using SDK coder and immediately decoded by
runner coder, *unless this can be optimized out* by property c).

This gives us two options where to implement this encode/decode step:

   1) completely inside runner with the possibility to optimize the
encode/decode step by identity under right circumstances

   2) partly in the runner and partly in the SDK - that is we encode
elements of PCollection using SDK coder into bytes, pass those to the
SDK harness and apply a custom decode step there. This works because SDK
coder encoded elements are in byte[], and that is well-known coder type.
We again only leverage property c) and optimize the SDK coder encode,
runner decode step out.

The option 2) is exactly the proposal of TestStream producing byte[] and
decoding inside SDK-harness, the TestStream is actually inlined
transform, the elements are produced directly in runner (the SDK coder
is not known to the runner, but that does not matter, because the
elements are already encoded by client).

  From the above it seems to me, that option 1) should be preferred, because:

   i) it is generic, applicable to all inlined transforms, any sources

   ii) it is consistent with how things logically work underneath

   iii) it offers better room for optimization - option 2) might result
in cases when the elements are passed from the runner to the SDK-harness
only for the sake of the decoding from SDK coder and immediately
encoding back using SDK-coder and returned back to the runner. This
would be the case when TestStream would be directly consumed by inlined
(or external) transform.
(1) is not possible if the Coder in question is not known to the
Runner, which is why I proposed (2).

There is no particular need for the coder to be known. If transform is to be inlined, what *has* to be known is the SDK-encoded form of the elements. That holds true if:

 a) either the SDK coder is known, or

 b) encoded form of the produced elements is known in advance

For TestStream it is the case b). For inlined primitive Read (or any other transform which executes code) it is a).


On 9/1/21 9:41 AM, Jan Lukavský wrote:
On 9/1/21 9:15 AM, Robert Bradshaw wrote:

On Tue, Aug 31, 2021 at 11:48 PM Jan Lukavský <[email protected]> wrote:
Sorry, I needed some time to let that sink in. :-)

I think I understand why (and how) this will work for TestStream, still
have a couple of notes, though:

    a) the problem of type compatibility arises with the primitive
Read as
well, though we can solve it with different expansion for TestStream,
that solution is not applicable to Read, because it has different
contract

    b) the same exact problem problem will arise every time we inline
any
transform that would otherwise be run in an environment

    c) extracting bytes from TestStream actually bypasses "model coders"
on the side of runner, should we do that in other cases as well?

The TestStream problem is a special case, I think a more generic
solution would be better.
Yep, TestStream is special and simpler. I'm not sure something generic
is always possible.

Two questions:

    1) Could we create a mapping in runners-core-construction-java that
would take Pipeline proto, and PCollectionId and create a mapping
function from "sdk coder space" to "runner coder space"? That could be
optimized to identity if the coder of the PCollection consists of model
coders only. In that case the sdk coder and runner coder are identical,
producing the same types. This mapping function could be reused by both
portable TestStream, inlined primitive Read and any other future
inlined
transform.
This could help with the specific case of Java transforms being
inlined on a Java-based runner. I'd rather avoid Java-specific
solutions if possible. (E.g. the ULR is in Python, and Dataflow Runner
v2 is in C++.)
The definition is generic, language agnostic. It can be rephrased as
"if the SDK of a runner matches SDK of a transform, the transform can
be inlined, and in order to be type-compatible coder mapping between
SDK coder space and runner coder space has to be applied". Yes, when
we implement this in "runners-core-construction-java", the "runner
SDK" and "transform SDK" equals to Java SDK on both parts. But that is
implied by the implementation, not the definition. For Go or Python we
can implement exactly the same.
I'd say if a runner decides to truely inline a transform with an
alternative implementation than calling the SDK, it would only do so
in cases where it understands the input and output types, if any.
(With Read we have a case where the output type is not fixed, which
complicates things.) Generally I would probably structure most
"inlined" operations as one that happens to execute in an in-process
environment rather than an external one, which would mean that
"inlined" ones would not have to be handled specially, but could just
fall out due to the existing length prefixing logic.

Of course just making SDFs work well on all runners would be the best
long term solution for Read, and avoid all these issues, but we may
need hacks in the meantime.
I think the problem is broader than sources, so while it is true, that
SDF would help, it is not the solution.
    2) Why does runner need to understand the types it processes and
does
not work with raw bytes all the times, including model coders, the same
way as is the proposed solution for TestStream, but for all transforms?
The first step to every executable stage would then be to decode the
input from raw bytes and only then process it. What is the benefit of
runner understanding _some_ of the coders? It would be required for a
runner to understand coders for root transforms (Impulse? any other?),
but are there any other places where this is necessary?
A runner needs to understand the KV coder to execute GroupByKey (and
the result is a KV<K, Iterable<V>>, so IterableCoder is a requirement
as well). WindowedCoders are likewise needed so the runner can
deconstruct the timestamps, etc. Timers are sent via TimerCoders. A
runner also needs a way to deal with unknown coders, hence
LengthPrefixCoder. Asside from that, the runner can just deal in raw
bytes (though sometimes there are efficiency gains to understanding
more coders, e.g. wrapping tiny integers in LengthPrefix could be
rather wasteful).

Well known coders, such as RowCoder, are of course useful for
communication between SDKs of different languages, even if the runner
doesn't care about them.
+1, understood and agree.
On 9/1/21 1:37 AM, Robert Bradshaw wrote:
On Tue, Aug 31, 2021 at 4:31 PM Jan Lukavský <[email protected]> wrote:
If I can depict that:

TestStream0 -> TestStream1 (DoFn), that is

TestStream0 -> TestStream1 (executable stage) -> consumer 2
(executable
stage)
Yes. Expanding it out more,

TestStream0 -> pc1 -> TestStream1 -> pc2 -> Consumer2 -> ...

Where pc1 is a PCollection<bytes> and pc2 is a PCollection<T>.

In this scenario TestStream0 can produce byte[], and TestStream1 can
consume them, the problem is that if the coder of TestStream0
I assume you mean TestStream1, as TestStream0 has no coder.

is some
(recursively) known model coder (e.g. KvCoder), then consumer 2
will not
be able to decode that data.
We know that, by construction, TestStream1 and Consumer2 both are
executed in an environment that understands pc2's Coder<T>. The
difference here is that the runner need not understand Coder<T> as it
could inject length prefixing on pc2 if necessary.


On 9/1/21 1:24 AM, Robert Bradshaw wrote:
On Tue, Aug 31, 2021 at 4:12 PM Jan Lukavský <[email protected]>
wrote:
Fortunately, there's no value in having the primitive TestStream
produce elements of arbitrary type (vs. sources, where the read is
inherently typed depending on the source and particular library
used
for reading it).
I disagree with this one - the consumer(s) of elements from
TestStream
are (should be treated as) unknown to the runner, might be inlined,
might be x-lang and whether or not will pass the boundary of
executable
stage, all these questions are important. The coder (and if it
is model
coder or not, and what are the sub-coders) must be precisely
known to
all parties that cooperate on the computation, because otherwise
these
parties might not agree on the binary representation.
What I'm saying is that there's little value in a primitive
TestStream that can produce all sorts of T, vs. a composite
TestStream
consisting of a TestStream0 primitive producing only bytes, followed
by a TestStream1 that decodes those bytes into T. This sidesteps all
issues of coder compatibility, as bytes is well-known and
TestStream1
is just a DoFn.

On 9/1/21 12:59 AM, Robert Bradshaw wrote:
Due to issues with Flink executing Reads as SDFs, there is an
alternative mode where Read is treated as a primitive and executed
directly in the runner. This, of course, requires that the
Read's UDF
be implemented in the same language as the Runner
(specifically, in
this case, Java) and their interpretation of what Coders should or
should not be wrapped agree (lest other complications arise).

It's true one can view TestStream as a source, but it's a bit more
than that as it has the ability to wait for quiessence before
emitting
more elements/watermark updates to give stronger guarantees on
determinism, which generally requires deeper runner
integration. In
addition, TestStream is not associated with a UDF or specific
SDK the
way Sources are which is where the problem arises: TestStream
gives
the encoded form of the elements but the runner may need to
produce a
different encoded form of the elements (say, one with length
prefixing) which is not possible without knowing the Coder.
Fortunately, there's no value in having the primitive TestStream
produce elements of arbitrary type (vs. sources, where the read is
inherently typed depending on the source and particular library
used
for reading it).


On Tue, Aug 31, 2021 at 3:41 PM Ke Wu <[email protected]> wrote:
Read does not have translation in portability, so the
implementation is that it needs to be primitive transform
explicitly implemented by the runner. The encoding/decoding
has to happen in the runner.


Could you help me understand this a bit more? IIRC, Read is
NOT being translated in portable mode exactly means it is a
composite transform instead of primitive because all primitive
transforms are required to be translated. In addition, Read is
a composite transform of Impulse, which produces dummy bytes
[1] to trigger subsequent ParDo/ExecutableStage, where
decoding the actual source happens [2]

There seems to be no role of the SDK harness with regard to
the TestStream, because the elements are already encoded by
the submitting SDK. The coders must match nevertheless,
because you can have Events of type
KV<KV<WindowedValue<Integer, Object>>> and what will and what
will not get length-prefixed depends on which parts exactly
are "known" (model) coders and which are not. Encoding the
whole value as single byte array will not work for the
consuming SDK harness, which will see that there should be
nested KvCoders instead.


I don’t think I fully understand what you say here. TestStream
is currently a primitive transform, therefore there is no role
of SDK harness. This is what the proposal to change, to make
TestStream a composite transform with a primitive transform
and subsequent ParDo to decode to the desired format.


[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Impulse.java#L39
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L149

On Aug 31, 2021, at 3:21 PM, Jan Lukavský <[email protected]>
wrote:

On 9/1/21 12:13 AM, Ke Wu wrote:

Hi Jan,

Here is my understanding,

Runner is being brought up by job server driver, which is up
and running before the job submission, i.e. it is job
agnostic. Therefore, the runner it brought up does not have
any SDK coder available and artifact staging only happens for
SDK workers.

You are right that Read and TestStream are sources, however
the one thing that distinguish them is that Read transform is
a composite transform and the decoding happens in
ParDo/ExecutableStage, i.e. on SDK worker.

Read does not have translation in portability, so the
implementation is that it needs to be primitive transform
explicitly implemented by the runner. The encoding/decoding
has to happen in the runner.


The proposal here is also to make the public facing TestStream
transform a composite transform instead of primitive now, so
that the decoding would occur on the SDK worker side where SDK
coder is available, and the primitive that powers TestStream,
which will be directly translated by runner to always produce
raw bytes, and these raw bytes will be decoded on the SDK
worker side.

There seems to be no role of the SDK harness with regard to
the TestStream, because the elements are already encoded by
the submitting SDK. The coders must match nevertheless,
because you can have Events of type
KV<KV<WindowedValue<Integer, Object>>> and what will and what
will not get length-prefixed depends on which parts exactly
are "known" (model) coders and which are not. Encoding the
whole value as single byte array will not work for the
consuming SDK harness, which will see that there should be
nested KvCoders instead.


Best,
Ke

On Aug 31, 2021, at 2:56 PM, Jan Lukavský <[email protected]>
wrote:

Sorry if I'm missing something obvious, but I don't quite see
the difference between Read and TestStream regarding the
discussed issue with coders. Couple of thoughts:

      a) both Read and TestStream are _sources_ - they produce
elements that are consumed by downstream transforms

      b) the coder of a particular PCollection is defined by
the Pipeline proto - it is the (client side) SDK that owns the
Pipeline and that defines all the coders

      c) runners must adhere to these coders, because otherwise
there is risk of coder mismatch, most probably on edges like
x-lang transforms or inlined transforms

I tried the approach of encoding the output of Read into byte
array as well, but that turns out to have the problem that
once there is a (partially) known coder in play, this does not
work, because the consuming transform (executable stage)
expects to see the wire coder - that is not simply byte array,
because the type of elements might be for instance KV<K, V>,
where KvCoder is one of ModelCoders. That does not encode
using LengthPrefixCoder and as such will be incompatible with
LengthPrefixCoder(ByteArrayCoder). The TestStream needs to
know the coder of elements, because that defines where exactly
must or must not be inserted length-prefixing. The logic in
LengthPrefixUnknownCoders [1] is recursive for ModelCoders.

[1]
https://github.com/apache/beam/blob/ff70e740a2155592dfcb302ff6303cc19660a268/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48

On 8/31/21 11:29 PM, Ke Wu wrote:

Awesome! Thank you Luke and Robert.

Also created https://issues.apache.org/jira/browse/BEAM-12828
to track unit test conversion. I could take it after I updated
Samza runner to support TestStream in portable mode.

On Aug 31, 2021, at 2:05 PM, Robert Bradshaw
<[email protected]> wrote:

Created https://issues.apache.org/jira/browse/BEAM-12827 to
track this.

+1 to converting tests to just use longs for better coverage
for now.

Also, yes, this is very similar to the issues encountered by
Reads,
but the solution is a bit simpler as there's no need for the
TestStream primitive to interact with the decoded version of the
elements (unlike Reads, where the sources often give elements in
un-encoded form) and no user code to run.

- Robert



On Tue, Aug 31, 2021 at 11:00 AM Jan Lukavský
<[email protected]> wrote:


This looks (and likely has the same cause) similar to what I
have experienced when making primitive Read supported by
Flink. The final solution would be to make SDK coders known to
the runner of the same SDK (already present in various
different threads). But until then, the solution seems to be
something like [1]. The root cause is that the executable
stage expects its input to be encoded by the SDK harness, and
that part is missing when the transform is inlined (like Read
in my case, or TestStream in your case). The intoWireTypes
method simulates precisely this part - it encodes the
PCollection via coder defined in the SDK harness and then
decodes it by coder defined by the runner (which match on
binary level, but produce different types).

Jan

[1]
https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657

On 8/31/21 7:27 PM, Luke Cwik wrote:

I originally wasn't for making it a composite because it
changes the "graph" structure but the more I thought about it
the more I like it.

On Tue, Aug 31, 2021 at 10:06 AM Robert Bradshaw
<[email protected]> wrote:


On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik <[email protected]>
wrote:


On Mon, Aug 30, 2021 at 7:07 PM Ke Wu <[email protected]> wrote:


Hello everyone,

This is Ke. I am working on enable TestStream support for
Samza Runner in portable mode and discovers something unexpected.

In my implementation for Samza Runner, couple of tests are
failing with errors like


java.lang.ClassCastException: java.lang.Integer cannot be cast
to [B

I noticed these tests have the same symptom on Flink Runner as
well, which are currently excluded:

https://issues.apache.org/jira/browse/BEAM-12048
https://issues.apache.org/jira/browse/BEAM-12050


After some more digging, I realized that it is because the
combination of following facts:

TestStream is a primitive transform, therefore, Runners are
supposed to translate directly, the most intuitive
implementation for each runner to do is to parse the payload
to decode TestStream.Event [1] on the Runner process to be
handed over to subsequent stages.
When TestStream used with Integers, i.e. VarIntCoder to
initialize, since VarIntCoder is NOT a registered ModelCoder
[2], it will be treated as custom coder during conversion to
protobuf pipeline [3] and will be replaced with byte array
coder [4] when runner sends data to SDK worker.
Therefore an error occurs because the decoded TestStream.Event
has Integer as its value but the remote input receiver is
expecting byte array, causing java.lang.ClassCastException:
java.lang.Integer cannot be cast to [B


In addition, I tried to update all these failed tests to use
Long instead of Integer, and all tests will pass since
VarLongCoder is a known coder. I do understand that runner
process does not have user artifacts staged so it can only use
coders in  beam model when communicating with SDK worker process.

Couple of questions on this:

1. Is it expected that VarIntegerCoder is not a known coder?



Yes since no one has worked to make it a well known coder.


The notion of "integer" vs. "long" is also language-specific
detail as
well, so not sure it makes sense as a well-known coder.

It can be made a well known coder and this would solve the
immediate problem but not the long term issue of portable
TestStream not supporting arbitrary types.


+1. Rather than making coder a property of TestStream, I would
be in
favor of the TestStream primitive always producing bytes
(basically,
by definition), and providing a composite that consists of this
followed by a decoding to give us a typed TestStream.


2. Is TestStream always supposed to be translated the payload
as raw bytes in order that runner process can always send it
to SDK worker with the default byte array coder and asks SDK
worker to decode accordingly?



Having the runner treat it always as bytes and not T is likely
the best solution but isn't necessary.

3. If Yes to 2), then does it mean, TestStream needs to be
translated in a completely different way in portable mode from
classic mode since in classic mode, translator can directly
translates the payload to its final format.


There are a few ways to fix the current implementation to work
for all types. One way would be if we required the
encoded_element to be the "nested" encoding and then ensured
that the runner uses a WindowedValue<ByteArrayCoder in outer
context> and the SDK used WindowedValue<T> (note that this
isn't WindowedValue<LengthPrefix<T>>) for the wire coders.
This is quite annoying cause the runner inserts length
prefixing in a lot of places (effectively every time it sees
an unknown type) so we would need to special case this and
propagate this correction through any runner native transforms
(e.g. GBK) until the SDK consumes it.

Another way would be to ensure that the SDK always uses
LengthPrefix<T> as the PCollection encoding and the
encoded_element format. This would mean that the runner can
translate it to a T if it so chooses and won't have the
annoying special case propagation logic. This leaks the length
prefixing into the SDK at graph construction time which is not
what it was meant for.

Swapping to use an existing well known type is by far the
easiest approach as you had discovered and won't impact the
correctness of the tests.


Best,
Ke


[1]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52
[2]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65
[3]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99
[4]
https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93




Reply via email to