If I can depict that:

TestStream0 -> TestStream1 (DoFn), that is

TestStream0 -> TestStream1 (executable stage) -> consumer 2 (executable stage)

In this scenario TestStream0 can produce byte[], and TestStream1 can consume them, the problem is that if the coder of TestStream0 is some (recursively) known model coder (e.g. KvCoder), then consumer 2 will not be able to decode that data.

On 9/1/21 1:24 AM, Robert Bradshaw wrote:
On Tue, Aug 31, 2021 at 4:12 PM Jan Lukavský <[email protected]> wrote:
Fortunately, there's no value in having the primitive TestStream
produce elements of arbitrary type (vs. sources, where the read is
inherently typed depending on the source and particular library used
for reading it).
I disagree with this one - the consumer(s) of elements from TestStream
are (should be treated as) unknown to the runner, might be inlined,
might be x-lang and whether or not will pass the boundary of executable
stage, all these questions are important. The coder (and if it is model
coder or not, and what are the sub-coders) must be precisely known to
all parties that cooperate on the computation, because otherwise these
parties might not agree on the binary representation.
What I'm saying is that there's little value in a  primitive
TestStream that can produce all sorts of T, vs. a composite TestStream
consisting of a TestStream0 primitive producing only bytes, followed
by a TestStream1 that decodes those bytes into T. This sidesteps all
issues of coder compatibility, as bytes is well-known and TestStream1
is just a DoFn.

On 9/1/21 12:59 AM, Robert Bradshaw wrote:
Due to issues with Flink executing Reads as SDFs, there is an
alternative mode where Read is treated as a primitive and executed
directly in the runner. This, of course, requires that the Read's UDF
be implemented in the same language as the Runner (specifically, in
this case, Java) and their interpretation of what Coders should or
should not be wrapped agree (lest other complications arise).

It's true one can view TestStream as a source, but it's a bit more
than that as it has the ability to wait for quiessence before emitting
more elements/watermark updates to give stronger guarantees on
determinism, which generally requires deeper runner integration. In
addition, TestStream is not associated with a UDF or specific SDK the
way Sources are which is where the problem arises: TestStream gives
the encoded form of the elements but the runner may need to produce a
different encoded form of the elements (say, one with length
prefixing) which is not possible without knowing the Coder.
Fortunately, there's no value in having the primitive TestStream
produce elements of arbitrary type (vs. sources, where the read is
inherently typed depending on the source and particular library used
for reading it).


On Tue, Aug 31, 2021 at 3:41 PM Ke Wu <[email protected]> wrote:
Read does not have translation in portability, so the implementation is that it 
needs to be primitive transform explicitly implemented by the runner. The 
encoding/decoding has to happen in the runner.


Could you help me understand this a bit more? IIRC, Read is NOT being 
translated in portable mode exactly means it is a composite transform instead 
of primitive because all primitive transforms are required to be translated. In 
addition, Read is a composite transform of Impulse, which produces dummy bytes 
[1] to trigger subsequent ParDo/ExecutableStage, where decoding the actual 
source happens [2]

There seems to be no role of the SDK harness with regard to the TestStream, because the elements are 
already encoded by the submitting SDK. The coders must match nevertheless, because you can have Events of 
type KV<KV<WindowedValue<Integer, Object>>> and what will and what will not get 
length-prefixed depends on which parts exactly are "known" (model) coders and which are not. 
Encoding the whole value as single byte array will not work for the consuming SDK harness, which will see 
that there should be nested KvCoders instead.


I don’t think I fully understand what you say here. TestStream is currently a 
primitive transform, therefore there is no role of SDK harness. This is what 
the proposal to change, to make TestStream a composite transform with a 
primitive transform and subsequent ParDo to decode to the desired format.


[1] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Impulse.java#L39
[2] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L149

On Aug 31, 2021, at 3:21 PM, Jan Lukavský <[email protected]> wrote:

On 9/1/21 12:13 AM, Ke Wu wrote:

Hi Jan,

Here is my understanding,

Runner is being brought up by job server driver, which is up and running before 
the job submission, i.e. it is job agnostic. Therefore, the runner it brought 
up does not have any SDK coder available and artifact staging only happens for 
SDK workers.

You are right that Read and TestStream are sources, however the one thing that 
distinguish them is that Read transform is a composite transform and the 
decoding happens in ParDo/ExecutableStage, i.e. on SDK worker.

Read does not have translation in portability, so the implementation is that it 
needs to be primitive transform explicitly implemented by the runner. The 
encoding/decoding has to happen in the runner.


The proposal here is also to make the public facing TestStream transform a 
composite transform instead of primitive now, so that the decoding would occur 
on the SDK worker side where SDK coder is available, and the primitive that 
powers TestStream, which will be directly translated by runner to always 
produce raw bytes, and these raw bytes will be decoded on the SDK worker side.

There seems to be no role of the SDK harness with regard to the TestStream, because the elements are 
already encoded by the submitting SDK. The coders must match nevertheless, because you can have Events of 
type KV<KV<WindowedValue<Integer, Object>>> and what will and what will not get 
length-prefixed depends on which parts exactly are "known" (model) coders and which are not. 
Encoding the whole value as single byte array will not work for the consuming SDK harness, which will see 
that there should be nested KvCoders instead.


Best,
Ke

On Aug 31, 2021, at 2:56 PM, Jan Lukavský <[email protected]> wrote:

Sorry if I'm missing something obvious, but I don't quite see the difference 
between Read and TestStream regarding the discussed issue with coders. Couple 
of thoughts:

   a) both Read and TestStream are _sources_ - they produce elements that are 
consumed by downstream transforms

   b) the coder of a particular PCollection is defined by the Pipeline proto - 
it is the (client side) SDK that owns the Pipeline and that defines all the 
coders

   c) runners must adhere to these coders, because otherwise there is risk of 
coder mismatch, most probably on edges like x-lang transforms or inlined 
transforms

I tried the approach of encoding the output of Read into byte array as well, but that 
turns out to have the problem that once there is a (partially) known coder in play, 
this does not work, because the consuming transform (executable stage) expects to see 
the wire coder - that is not simply byte array, because the type of elements might be 
for instance KV<K, V>, where KvCoder is one of ModelCoders. That does not 
encode using LengthPrefixCoder and as such will be incompatible with 
LengthPrefixCoder(ByteArrayCoder). The TestStream needs to know the coder of 
elements, because that defines where exactly must or must not be inserted 
length-prefixing. The logic in LengthPrefixUnknownCoders [1] is recursive for 
ModelCoders.

[1] 
https://github.com/apache/beam/blob/ff70e740a2155592dfcb302ff6303cc19660a268/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48

On 8/31/21 11:29 PM, Ke Wu wrote:

Awesome! Thank you Luke and Robert.

Also created https://issues.apache.org/jira/browse/BEAM-12828 to track unit 
test conversion. I could take it after I updated Samza runner to support 
TestStream in portable mode.

On Aug 31, 2021, at 2:05 PM, Robert Bradshaw <[email protected]> wrote:

Created https://issues.apache.org/jira/browse/BEAM-12827 to track this.

+1 to converting tests to just use longs for better coverage for now.

Also, yes, this is very similar to the issues encountered by Reads,
but the solution is a bit simpler as there's no need for the
TestStream primitive to interact with the decoded version of the
elements (unlike Reads, where the sources often give elements in
un-encoded form) and no user code to run.

- Robert



On Tue, Aug 31, 2021 at 11:00 AM Jan Lukavský <[email protected]> wrote:


This looks (and likely has the same cause) similar to what I have experienced 
when making primitive Read supported by Flink. The final solution would be to 
make SDK coders known to the runner of the same SDK (already present in various 
different threads). But until then, the solution seems to be something like 
[1]. The root cause is that the executable stage expects its input to be 
encoded by the SDK harness, and that part is missing when the transform is 
inlined (like Read in my case, or TestStream in your case). The intoWireTypes 
method simulates precisely this part - it encodes the PCollection via coder 
defined in the SDK harness and then decodes it by coder defined by the runner 
(which match on binary level, but produce different types).

Jan

[1] 
https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657

On 8/31/21 7:27 PM, Luke Cwik wrote:

I originally wasn't for making it a composite because it changes the "graph" 
structure but the more I thought about it the more I like it.

On Tue, Aug 31, 2021 at 10:06 AM Robert Bradshaw <[email protected]> wrote:


On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik <[email protected]> wrote:


On Mon, Aug 30, 2021 at 7:07 PM Ke Wu <[email protected]> wrote:


Hello everyone,

This is Ke. I am working on enable TestStream support for Samza Runner in 
portable mode and discovers something unexpected.

In my implementation for Samza Runner, couple of tests are failing with errors 
like


java.lang.ClassCastException: java.lang.Integer cannot be cast to [B

I noticed these tests have the same symptom on Flink Runner as well, which are 
currently excluded:

https://issues.apache.org/jira/browse/BEAM-12048
https://issues.apache.org/jira/browse/BEAM-12050


After some more digging, I realized that it is because the combination of 
following facts:

TestStream is a primitive transform, therefore, Runners are supposed to 
translate directly, the most intuitive implementation for each runner to do is 
to parse the payload to decode TestStream.Event [1] on the Runner process to be 
handed over to subsequent stages.
When TestStream used with Integers, i.e. VarIntCoder to initialize, since 
VarIntCoder is NOT a registered ModelCoder [2], it will be treated as custom 
coder during conversion to protobuf pipeline [3] and will be replaced with byte 
array coder [4] when runner sends data to SDK worker.
Therefore an error occurs because the decoded TestStream.Event has Integer as 
its value but the remote input receiver is expecting byte array, causing 
java.lang.ClassCastException: java.lang.Integer cannot be cast to [B


In addition, I tried to update all these failed tests to use Long instead of 
Integer, and all tests will pass since VarLongCoder is a known coder. I do 
understand that runner process does not have user artifacts staged so it can 
only use coders in  beam model when communicating with SDK worker process.

Couple of questions on this:

1. Is it expected that VarIntegerCoder is not a known coder?



Yes since no one has worked to make it a well known coder.


The notion of "integer" vs. "long" is also language-specific detail as
well, so not sure it makes sense as a well-known coder.

It can be made a well known coder and this would solve the immediate problem 
but not the long term issue of portable TestStream not supporting arbitrary 
types.


+1. Rather than making coder a property of TestStream, I would be in
favor of the TestStream primitive always producing bytes (basically,
by definition), and providing a composite that consists of this
followed by a decoding to give us a typed TestStream.


2. Is TestStream always supposed to be translated the payload as raw bytes in 
order that runner process can always send it to SDK worker with the default 
byte array coder and asks SDK worker to decode accordingly?



Having the runner treat it always as bytes and not T is likely the best 
solution but isn't necessary.

3. If Yes to 2), then does it mean, TestStream needs to be translated in a 
completely different way in portable mode from classic mode since in classic 
mode, translator can directly translates the payload to its final format.


There are a few ways to fix the current implementation to work for all types. One way would be if we required the 
encoded_element to be the "nested" encoding and then ensured that the runner uses a 
WindowedValue<ByteArrayCoder in outer context> and the SDK used WindowedValue<T> (note that this 
isn't WindowedValue<LengthPrefix<T>>) for the wire coders. This is quite annoying cause the runner 
inserts length prefixing in a lot of places (effectively every time it sees an unknown type) so we would need to 
special case this and propagate this correction through any runner native transforms (e.g. GBK) until the SDK 
consumes it.

Another way would be to ensure that the SDK always uses LengthPrefix<T> as the 
PCollection encoding and the encoded_element format. This would mean that the runner 
can translate it to a T if it so chooses and won't have the annoying special case 
propagation logic. This leaks the length prefixing into the SDK at graph construction 
time which is not what it was meant for.

Swapping to use an existing well known type is by far the easiest approach as 
you had discovered and won't impact the correctness of the tests.


Best,
Ke


[1] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52
[2] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65
[3] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99
[4] 
https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93




Reply via email to