This looks (and likely has the same cause) similar to what I have experienced when making primitive Read supported by Flink. The final solution would be to make SDK coders known to the runner of the same SDK (already present in various different threads). But until then, the solution seems to be something like [1]. The root cause is that the executable stage expects its input to be encoded by the SDK harness, and that part is missing when the transform is inlined (like Read in my case, or TestStream in your case). The intoWireTypes method simulates precisely this part - it encodes the PCollection via coder defined in the SDK harness and then decodes it by coder defined by the runner (which match on binary level, but produce different types).

 Jan

[1] https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657

On 8/31/21 7:27 PM, Luke Cwik wrote:
I originally wasn't for making it a composite because it changes the "graph" structure but the more I thought about it the more I like it.

On Tue, Aug 31, 2021 at 10:06 AM Robert Bradshaw <[email protected] <mailto:[email protected]>> wrote:

    On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik <[email protected]
    <mailto:[email protected]>> wrote:
    >
    > On Mon, Aug 30, 2021 at 7:07 PM Ke Wu <[email protected]
    <mailto:[email protected]>> wrote:
    >>
    >> Hello everyone,
    >>
    >> This is Ke. I am working on enable TestStream support for Samza
    Runner in portable mode and discovers something unexpected.
    >>
    >> In my implementation for Samza Runner, couple of tests are
    failing with errors like
    >>
    >>
    >> java.lang.ClassCastException: java.lang.Integer cannot be cast
    to [B
    >>
    >> I noticed these tests have the same symptom on Flink Runner as
    well, which are currently excluded:
    >>
    >> https://issues.apache.org/jira/browse/BEAM-12048
    <https://issues.apache.org/jira/browse/BEAM-12048>
    >> https://issues.apache.org/jira/browse/BEAM-12050
    <https://issues.apache.org/jira/browse/BEAM-12050>
    >>
    >>
    >> After some more digging, I realized that it is because the
    combination of following facts:
    >>
    >> TestStream is a primitive transform, therefore, Runners are
    supposed to translate directly, the most intuitive implementation
    for each runner to do is to parse the payload to decode
    TestStream.Event [1] on the Runner process to be handed over to
    subsequent stages.
    >> When TestStream used with Integers, i.e. VarIntCoder to
    initialize, since VarIntCoder is NOT a registered ModelCoder [2],
    it will be treated as custom coder during conversion to protobuf
    pipeline [3] and will be replaced with byte array coder [4] when
    runner sends data to SDK worker.
    >> Therefore an error occurs because the decoded TestStream.Event
    has Integer as its value but the remote input receiver is
    expecting byte array, causing java.lang.ClassCastException:
    java.lang.Integer cannot be cast to [B
    >>
    >>
    >> In addition, I tried to update all these failed tests to use
    Long instead of Integer, and all tests will pass since
    VarLongCoder is a known coder. I do understand that runner process
    does not have user artifacts staged so it can only use coders in 
    beam model when communicating with SDK worker process.
    >>
    >> Couple of questions on this:
    >>
    >> 1. Is it expected that VarIntegerCoder is not a known coder?
    >
    >
    > Yes since no one has worked to make it a well known coder.

    The notion of "integer" vs. "long" is also language-specific detail as
    well, so not sure it makes sense as a well-known coder.

    > It can be made a well known coder and this would solve the
    immediate problem but not the long term issue of portable
    TestStream not supporting arbitrary types.

    +1. Rather than making coder a property of TestStream, I would be in
    favor of the TestStream primitive always producing bytes (basically,
    by definition), and providing a composite that consists of this
    followed by a decoding to give us a typed TestStream.


    >> 2. Is TestStream always supposed to be translated the payload
    as raw bytes in order that runner process can always send it to
    SDK worker with the default byte array coder and asks SDK worker
    to decode accordingly?
    >
    >
    > Having the runner treat it always as bytes and not T is likely
    the best solution but isn't necessary.
    >
    >> 3. If Yes to 2), then does it mean, TestStream needs to be
    translated in a completely different way in portable mode from
    classic mode since in classic mode, translator can directly
    translates the payload to its final format.
    >>
    >
    > There are a few ways to fix the current implementation to work
    for all types. One way would be if we required the encoded_element
    to be the "nested" encoding and then ensured that the runner uses
    a WindowedValue<ByteArrayCoder in outer context> and the SDK used
    WindowedValue<T> (note that this isn't
    WindowedValue<LengthPrefix<T>>) for the wire coders. This is quite
    annoying cause the runner inserts length prefixing in a lot of
    places (effectively every time it sees an unknown type) so we
    would need to special case this and propagate this correction
    through any runner native transforms (e.g. GBK) until the SDK
    consumes it.
    >
    > Another way would be to ensure that the SDK always uses
    LengthPrefix<T> as the PCollection encoding and the
    encoded_element format. This would mean that the runner can
    translate it to a T if it so chooses and won't have the annoying
    special case propagation logic. This leaks the length prefixing
    into the SDK at graph construction time which is not what it was
    meant for.
    >
    > Swapping to use an existing well known type is by far the
    easiest approach as you had discovered and won't impact the
    correctness of the tests.
    >
    >>
    >> Best,
    >> Ke
    >>
    >>
    >> [1]
    
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52
    
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52>
    >> [2]
    
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65
    
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65>
    >> [3]
    
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99
    
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99>
    >> [4]
    
https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93
    
<https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93>

Reply via email to