Sorry if I'm missing something obvious, but I don't quite see the
difference between Read and TestStream regarding the discussed issue
with coders. Couple of thoughts:
a) both Read and TestStream are _sources_ - they produce elements that
are consumed by downstream transforms
b) the coder of a particular PCollection is defined by the Pipeline
proto - it is the (client side) SDK that owns the Pipeline and that
defines all the coders
c) runners must adhere to these coders, because otherwise there is
risk of coder mismatch, most probably on edges like x-lang transforms or
inlined transforms
I tried the approach of encoding the output of Read into byte array as
well, but that turns out to have the problem that once there is a
(partially) known coder in play, this does not work, because the
consuming transform (executable stage) expects to see the wire coder -
that is not simply byte array, because the type of elements might be for
instance KV<K, V>, where KvCoder is one of ModelCoders. That does not
encode using LengthPrefixCoder and as such will be incompatible with
LengthPrefixCoder(ByteArrayCoder). The TestStream needs to know the
coder of elements, because that defines where exactly must or must not
be inserted length-prefixing. The logic in LengthPrefixUnknownCoders [1]
is recursive for ModelCoders.
[1]
https://github.com/apache/beam/blob/ff70e740a2155592dfcb302ff6303cc19660a268/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48
On 8/31/21 11:29 PM, Ke Wu wrote:
Awesome! Thank you Luke and Robert.
Also created https://issues.apache.org/jira/browse/BEAM-12828
<https://issues.apache.org/jira/browse/BEAM-12828> to track unit test
conversion. I could take it after I updated Samza runner to support
TestStream in portable mode.
On Aug 31, 2021, at 2:05 PM, Robert Bradshaw <[email protected]
<mailto:[email protected]>> wrote:
Created https://issues.apache.org/jira/browse/BEAM-12827
<https://issues.apache.org/jira/browse/BEAM-12827> to track this.
+1 to converting tests to just use longs for better coverage for now.
Also, yes, this is very similar to the issues encountered by Reads,
but the solution is a bit simpler as there's no need for the
TestStream primitive to interact with the decoded version of the
elements (unlike Reads, where the sources often give elements in
un-encoded form) and no user code to run.
- Robert
On Tue, Aug 31, 2021 at 11:00 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
This looks (and likely has the same cause) similar to what I have
experienced when making primitive Read supported by Flink. The final
solution would be to make SDK coders known to the runner of the same
SDK (already present in various different threads). But until then,
the solution seems to be something like [1]. The root cause is that
the executable stage expects its input to be encoded by the SDK
harness, and that part is missing when the transform is inlined
(like Read in my case, or TestStream in your case). The
intoWireTypes method simulates precisely this part - it encodes the
PCollection via coder defined in the SDK harness and then decodes it
by coder defined by the runner (which match on binary level, but
produce different types).
Jan
[1]
https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657
<https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657>
On 8/31/21 7:27 PM, Luke Cwik wrote:
I originally wasn't for making it a composite because it changes the
"graph" structure but the more I thought about it the more I like it.
On Tue, Aug 31, 2021 at 10:06 AM Robert Bradshaw
<[email protected] <mailto:[email protected]>> wrote:
On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik <[email protected]
<mailto:[email protected]>> wrote:
On Mon, Aug 30, 2021 at 7:07 PM Ke Wu <[email protected]
<mailto:[email protected]>> wrote:
Hello everyone,
This is Ke. I am working on enable TestStream support for Samza
Runner in portable mode and discovers something unexpected.
In my implementation for Samza Runner, couple of tests are
failing with errors like
java.lang.ClassCastException: java.lang.Integer cannot be cast to [B
I noticed these tests have the same symptom on Flink Runner as
well, which are currently excluded:
https://issues.apache.org/jira/browse/BEAM-12048
<https://issues.apache.org/jira/browse/BEAM-12048>
https://issues.apache.org/jira/browse/BEAM-12050
After some more digging, I realized that it is because the
combination of following facts:
TestStream is a primitive transform, therefore, Runners are
supposed to translate directly, the most intuitive implementation
for each runner to do is to parse the payload to decode
TestStream.Event [1] on the Runner process to be handed over to
subsequent stages.
When TestStream used with Integers, i.e. VarIntCoder to
initialize, since VarIntCoder is NOT a registered ModelCoder [2],
it will be treated as custom coder during conversion to protobuf
pipeline [3] and will be replaced with byte array coder [4] when
runner sends data to SDK worker.
Therefore an error occurs because the decoded TestStream.Event
has Integer as its value but the remote input receiver is
expecting byte array, causing java.lang.ClassCastException:
java.lang.Integer cannot be cast to [B
In addition, I tried to update all these failed tests to use Long
instead of Integer, and all tests will pass since VarLongCoder is
a known coder. I do understand that runner process does not have
user artifacts staged so it can only use coders in beam model
when communicating with SDK worker process.
Couple of questions on this:
1. Is it expected that VarIntegerCoder is not a known coder?
Yes since no one has worked to make it a well known coder.
The notion of "integer" vs. "long" is also language-specific detail as
well, so not sure it makes sense as a well-known coder.
It can be made a well known coder and this would solve the
immediate problem but not the long term issue of portable
TestStream not supporting arbitrary types.
+1. Rather than making coder a property of TestStream, I would be in
favor of the TestStream primitive always producing bytes (basically,
by definition), and providing a composite that consists of this
followed by a decoding to give us a typed TestStream.
2. Is TestStream always supposed to be translated the payload as
raw bytes in order that runner process can always send it to SDK
worker with the default byte array coder and asks SDK worker to
decode accordingly?
Having the runner treat it always as bytes and not T is likely the
best solution but isn't necessary.
3. If Yes to 2), then does it mean, TestStream needs to be
translated in a completely different way in portable mode from
classic mode since in classic mode, translator can directly
translates the payload to its final format.
There are a few ways to fix the current implementation to work for
all types. One way would be if we required the encoded_element to
be the "nested" encoding and then ensured that the runner uses a
WindowedValue<ByteArrayCoder in outer context> and the SDK used
WindowedValue<T> (note that this isn't
WindowedValue<LengthPrefix<T>>) for the wire coders. This is quite
annoying cause the runner inserts length prefixing in a lot of
places (effectively every time it sees an unknown type) so we
would need to special case this and propagate this correction
through any runner native transforms (e.g. GBK) until the SDK
consumes it.
Another way would be to ensure that the SDK always uses
LengthPrefix<T> as the PCollection encoding and the
encoded_element format. This would mean that the runner can
translate it to a T if it so chooses and won't have the annoying
special case propagation logic. This leaks the length prefixing
into the SDK at graph construction time which is not what it was
meant for.
Swapping to use an existing well known type is by far the easiest
approach as you had discovered and won't impact the correctness of
the tests.
Best,
Ke
[1]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52>
[2]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65>
[3]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99>
[4]
https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93
<https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93>