On 8/31/21 8:45 PM, Luke Cwik wrote:
I don't think we can make Java based runners use the SDKs coder since TestStream is also used within Go and Python pipelines.
That should be no problem, coders that are unknown to the runner (SDK) will remain unknown. If Go SDK encodes an Event in a way that is not understood by Java runner it will react appropriately.

On Tue, Aug 31, 2021 at 11:00 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    This looks (and likely has the same cause) similar to what I have
    experienced when making primitive Read supported by Flink. The
    final solution would be to make SDK coders known to the runner of
    the same SDK (already present in various different threads). But
    until then, the solution seems to be something like [1]. The root
    cause is that the executable stage expects its input to be encoded
    by the SDK harness, and that part is missing when the transform is
    inlined (like Read in my case, or TestStream in your case). The
    intoWireTypes method simulates precisely this part - it encodes
    the PCollection via coder defined in the SDK harness and then
    decodes it by coder defined by the runner (which match on binary
    level, but produce different types).

     Jan

    [1]
    
https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657
    
<https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657>

    On 8/31/21 7:27 PM, Luke Cwik wrote:
    I originally wasn't for making it a composite because it changes
    the "graph" structure but the more I thought about it the more I
    like it.

    On Tue, Aug 31, 2021 at 10:06 AM Robert Bradshaw
    <[email protected] <mailto:[email protected]>> wrote:

        On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik <[email protected]
        <mailto:[email protected]>> wrote:
        >
        > On Mon, Aug 30, 2021 at 7:07 PM Ke Wu <[email protected]
        <mailto:[email protected]>> wrote:
        >>
        >> Hello everyone,
        >>
        >> This is Ke. I am working on enable TestStream support for
        Samza Runner in portable mode and discovers something unexpected.
        >>
        >> In my implementation for Samza Runner, couple of tests are
        failing with errors like
        >>
        >>
        >> java.lang.ClassCastException: java.lang.Integer cannot be
        cast to [B
        >>
        >> I noticed these tests have the same symptom on Flink
        Runner as well, which are currently excluded:
        >>
        >> https://issues.apache.org/jira/browse/BEAM-12048
        <https://issues.apache.org/jira/browse/BEAM-12048>
        >> https://issues.apache.org/jira/browse/BEAM-12050
        <https://issues.apache.org/jira/browse/BEAM-12050>
        >>
        >>
        >> After some more digging, I realized that it is because the
        combination of following facts:
        >>
        >> TestStream is a primitive transform, therefore, Runners
        are supposed to translate directly, the most intuitive
        implementation for each runner to do is to parse the payload
        to decode TestStream.Event [1] on the Runner process to be
        handed over to subsequent stages.
        >> When TestStream used with Integers, i.e. VarIntCoder to
        initialize, since VarIntCoder is NOT a registered ModelCoder
        [2], it will be treated as custom coder during conversion to
        protobuf pipeline [3] and will be replaced with byte array
        coder [4] when runner sends data to SDK worker.
        >> Therefore an error occurs because the decoded
        TestStream.Event has Integer as its value but the remote
        input receiver is expecting byte array, causing
        java.lang.ClassCastException: java.lang.Integer cannot be
        cast to [B
        >>
        >>
        >> In addition, I tried to update all these failed tests to
        use Long instead of Integer, and all tests will pass since
        VarLongCoder is a known coder. I do understand that runner
        process does not have user artifacts staged so it can only
        use coders in  beam model when communicating with SDK worker
        process.
        >>
        >> Couple of questions on this:
        >>
        >> 1. Is it expected that VarIntegerCoder is not a known coder?
        >
        >
        > Yes since no one has worked to make it a well known coder.

        The notion of "integer" vs. "long" is also language-specific
        detail as
        well, so not sure it makes sense as a well-known coder.

        > It can be made a well known coder and this would solve the
        immediate problem but not the long term issue of portable
        TestStream not supporting arbitrary types.

        +1. Rather than making coder a property of TestStream, I
        would be in
        favor of the TestStream primitive always producing bytes
        (basically,
        by definition), and providing a composite that consists of this
        followed by a decoding to give us a typed TestStream.


        >> 2. Is TestStream always supposed to be translated the
        payload as raw bytes in order that runner process can always
        send it to SDK worker with the default byte array coder and
        asks SDK worker to decode accordingly?
        >
        >
        > Having the runner treat it always as bytes and not T is
        likely the best solution but isn't necessary.
        >
        >> 3. If Yes to 2), then does it mean, TestStream needs to be
        translated in a completely different way in portable mode
        from classic mode since in classic mode, translator can
        directly translates the payload to its final format.
        >>
        >
        > There are a few ways to fix the current implementation to
        work for all types. One way would be if we required the
        encoded_element to be the "nested" encoding and then ensured
        that the runner uses a WindowedValue<ByteArrayCoder in outer
        context> and the SDK used WindowedValue<T> (note that this
        isn't WindowedValue<LengthPrefix<T>>) for the wire coders.
        This is quite annoying cause the runner inserts length
        prefixing in a lot of places (effectively every time it sees
        an unknown type) so we would need to special case this and
        propagate this correction through any runner native
        transforms (e.g. GBK) until the SDK consumes it.
        >
        > Another way would be to ensure that the SDK always uses
        LengthPrefix<T> as the PCollection encoding and the
        encoded_element format. This would mean that the runner can
        translate it to a T if it so chooses and won't have the
        annoying special case propagation logic. This leaks the
        length prefixing into the SDK at graph construction time
        which is not what it was meant for.
        >
        > Swapping to use an existing well known type is by far the
        easiest approach as you had discovered and won't impact the
        correctness of the tests.
        >
        >>
        >> Best,
        >> Ke
        >>
        >>
        >> [1]
        
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52
        
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52>
        >> [2]
        
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65
        
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65>
        >> [3]
        
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99
        
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99>
        >> [4]
        
https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93
        
<https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93>

Reply via email to