On 9/1/21 12:41 AM, Ke Wu wrote:
Read does not have translation in portability, so the implementation
is that it needs to be primitive transform explicitly implemented by
the runner. The encoding/decoding has to happen in the runner.
Could you help me understand this a bit more? IIRC, Read is *NOT*
being translated in portable mode exactly means it is a composite
transform instead of primitive because all primitive transforms are
required to be translated. In addition, Read is a composite transform
of Impulse, which produces dummy bytes [1] to trigger subsequent
ParDo/ExecutableStage, where decoding the actual source happens [2]
Sorry, I was referring to the use_deprecated_read Read transform (not
SDF). That is primitive Read which has no translation on the SDK harness
side. [1]
[1]
https://lists.apache.org/thread.html/r42284d641a133ead6d80a5af01ac8bd4e01f1fba4197d0018f092f52%40%3Cdev.beam.apache.org%3E
There seems to be no role of the SDK harness with regard to the
TestStream, because the elements are already encoded by the
submitting SDK. The coders must match nevertheless, because you can
have Events of type KV<KV<WindowedValue<Integer, Object>>> and what
will and what will not get length-prefixed depends on which parts
exactly are "known" (model) coders and which are not. Encoding the
whole value as single byte array will not work for the consuming SDK
harness, which will see that there should be nested KvCoders instead.
I don’t think I fully understand what you say here. TestStream is
currently a primitive transform, therefore there is no role of SDK
harness. This is what the proposal to change, to make TestStream a
composite transform with a primitive transform and subsequent ParDo to
decode to the desired format.
The problem is that the _desired format_ depends on the (source) coder.
There is different representation of SDK coder and model coder. The
latter will not be (itself) length-prefixed, but will be recursively
introspected and length-prefixed will be only really unknown sub-coders.
That is why the coder of elements of TestStream has to be known to the
runner.
[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Impulse.java#L39
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Impulse.java#L39>
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L149
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L149>
On Aug 31, 2021, at 3:21 PM, Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
On 9/1/21 12:13 AM, Ke Wu wrote:
Hi Jan,
Here is my understanding,
Runner is being brought up by job server driver, which is up and
running before the job submission, i.e. it is job agnostic.
Therefore, the runner it brought up does not have any SDK coder
available and artifact staging only happens for SDK workers.
You are right that Read and TestStream are sources, however the one
thing that distinguish them is that Read transform is a composite
transform and the decoding happens in ParDo/ExecutableStage, i.e. on
SDK worker.
Read does not have translation in portability, so the implementation
is that it needs to be primitive transform explicitly implemented by
the runner. The encoding/decoding has to happen in the runner.
The proposal here is also to make the public facing TestStream
transform a composite transform instead of primitive now, so that
the decoding would occur on the SDK worker side where SDK coder is
available, and the primitive that powers TestStream, which will be
directly translated by runner to always produce raw bytes, and these
raw bytes will be decoded on the SDK worker side.
There seems to be no role of the SDK harness with regard to the
TestStream, because the elements are already encoded by the
submitting SDK. The coders must match nevertheless, because you can
have Events of type KV<KV<WindowedValue<Integer, Object>>> and what
will and what will not get length-prefixed depends on which parts
exactly are "known" (model) coders and which are not. Encoding the
whole value as single byte array will not work for the consuming SDK
harness, which will see that there should be nested KvCoders instead.
Best,
Ke
On Aug 31, 2021, at 2:56 PM, Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Sorry if I'm missing something obvious, but I don't quite see the
difference between Read and TestStream regarding the discussed
issue with coders. Couple of thoughts:
a) both Read and TestStream are _sources_ - they produce elements
that are consumed by downstream transforms
b) the coder of a particular PCollection is defined by the
Pipeline proto - it is the (client side) SDK that owns the Pipeline
and that defines all the coders
c) runners must adhere to these coders, because otherwise there is
risk of coder mismatch, most probably on edges like x-lang
transforms or inlined transforms
I tried the approach of encoding the output of Read into byte array
as well, but that turns out to have the problem that once there is
a (partially) known coder in play, this does not work, because the
consuming transform (executable stage) expects to see the wire
coder - that is not simply byte array, because the type of elements
might be for instance KV<K, V>, where KvCoder is one of
ModelCoders. That does not encode using LengthPrefixCoder and as
such will be incompatible with LengthPrefixCoder(ByteArrayCoder).
The TestStream needs to know the coder of elements, because that
defines where exactly must or must not be inserted
length-prefixing. The logic in LengthPrefixUnknownCoders [1] is
recursive for ModelCoders.
[1]
https://github.com/apache/beam/blob/ff70e740a2155592dfcb302ff6303cc19660a268/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48
On 8/31/21 11:29 PM, Ke Wu wrote:
Awesome! Thank you Luke and Robert.
Also created https://issues.apache.org/jira/browse/BEAM-12828
<https://issues.apache.org/jira/browse/BEAM-12828> to track unit
test conversion. I could take it after I updated Samza runner to
support TestStream in portable mode.
On Aug 31, 2021, at 2:05 PM, Robert Bradshaw <[email protected]
<mailto:[email protected]>> wrote:
Created https://issues.apache.org/jira/browse/BEAM-12827
<https://issues.apache.org/jira/browse/BEAM-12827> to track this.
+1 to converting tests to just use longs for better coverage for now.
Also, yes, this is very similar to the issues encountered by Reads,
but the solution is a bit simpler as there's no need for the
TestStream primitive to interact with the decoded version of the
elements (unlike Reads, where the sources often give elements in
un-encoded form) and no user code to run.
- Robert
On Tue, Aug 31, 2021 at 11:00 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
This looks (and likely has the same cause) similar to what I
have experienced when making primitive Read supported by Flink.
The final solution would be to make SDK coders known to the
runner of the same SDK (already present in various different
threads). But until then, the solution seems to be something
like [1]. The root cause is that the executable stage expects
its input to be encoded by the SDK harness, and that part is
missing when the transform is inlined (like Read in my case, or
TestStream in your case). The intoWireTypes method simulates
precisely this part - it encodes the PCollection via coder
defined in the SDK harness and then decodes it by coder defined
by the runner (which match on binary level, but produce
different types).
Jan
[1]
https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657
<https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657>
On 8/31/21 7:27 PM, Luke Cwik wrote:
I originally wasn't for making it a composite because it changes
the "graph" structure but the more I thought about it the more I
like it.
On Tue, Aug 31, 2021 at 10:06 AM Robert Bradshaw
<[email protected] <mailto:[email protected]>> wrote:
On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik <[email protected]
<mailto:[email protected]>> wrote:
On Mon, Aug 30, 2021 at 7:07 PM Ke Wu <[email protected]
<mailto:[email protected]>> wrote:
Hello everyone,
This is Ke. I am working on enable TestStream support for
Samza Runner in portable mode and discovers something unexpected.
In my implementation for Samza Runner, couple of tests are
failing with errors like
java.lang.ClassCastException: java.lang.Integer cannot be
cast to [B
I noticed these tests have the same symptom on Flink Runner
as well, which are currently excluded:
https://issues.apache.org/jira/browse/BEAM-12048
<https://issues.apache.org/jira/browse/BEAM-12048>
https://issues.apache.org/jira/browse/BEAM-12050
After some more digging, I realized that it is because the
combination of following facts:
TestStream is a primitive transform, therefore, Runners are
supposed to translate directly, the most intuitive
implementation for each runner to do is to parse the payload
to decode TestStream.Event [1] on the Runner process to be
handed over to subsequent stages.
When TestStream used with Integers, i.e. VarIntCoder to
initialize, since VarIntCoder is NOT a registered ModelCoder
[2], it will be treated as custom coder during conversion to
protobuf pipeline [3] and will be replaced with byte array
coder [4] when runner sends data to SDK worker.
Therefore an error occurs because the decoded
TestStream.Event has Integer as its value but the remote
input receiver is expecting byte array, causing
java.lang.ClassCastException: java.lang.Integer cannot be
cast to [B
In addition, I tried to update all these failed tests to use
Long instead of Integer, and all tests will pass since
VarLongCoder is a known coder. I do understand that runner
process does not have user artifacts staged so it can only
use coders in beam model when communicating with SDK worker
process.
Couple of questions on this:
1. Is it expected that VarIntegerCoder is not a known coder?
Yes since no one has worked to make it a well known coder.
The notion of "integer" vs. "long" is also language-specific
detail as
well, so not sure it makes sense as a well-known coder.
It can be made a well known coder and this would solve the
immediate problem but not the long term issue of portable
TestStream not supporting arbitrary types.
+1. Rather than making coder a property of TestStream, I would
be in
favor of the TestStream primitive always producing bytes
(basically,
by definition), and providing a composite that consists of this
followed by a decoding to give us a typed TestStream.
2. Is TestStream always supposed to be translated the payload
as raw bytes in order that runner process can always send it
to SDK worker with the default byte array coder and asks SDK
worker to decode accordingly?
Having the runner treat it always as bytes and not T is likely
the best solution but isn't necessary.
3. If Yes to 2), then does it mean, TestStream needs to be
translated in a completely different way in portable mode
from classic mode since in classic mode, translator can
directly translates the payload to its final format.
There are a few ways to fix the current implementation to work
for all types. One way would be if we required the
encoded_element to be the "nested" encoding and then ensured
that the runner uses a WindowedValue<ByteArrayCoder in outer
context> and the SDK used WindowedValue<T> (note that this
isn't WindowedValue<LengthPrefix<T>>) for the wire coders.
This is quite annoying cause the runner inserts length
prefixing in a lot of places (effectively every time it sees
an unknown type) so we would need to special case this and
propagate this correction through any runner native transforms
(e.g. GBK) until the SDK consumes it.
Another way would be to ensure that the SDK always uses
LengthPrefix<T> as the PCollection encoding and the
encoded_element format. This would mean that the runner can
translate it to a T if it so chooses and won't have the
annoying special case propagation logic. This leaks the length
prefixing into the SDK at graph construction time which is not
what it was meant for.
Swapping to use an existing well known type is by far the
easiest approach as you had discovered and won't impact the
correctness of the tests.
Best,
Ke
[1]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52>
[2]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65>
[3]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99>
[4]
https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93
<https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93>