I agree with Robert, having the TestStream be coder Aware seems very
strange since it's already followed by a PCollection node that actually
knows the type/coder being used.

Is there any reason TestStream *needs* to be aware of the type it's bytes
segments represent?

On Tue, Aug 31, 2021, 10:06 AM Robert Bradshaw <[email protected]> wrote:

> On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik <[email protected]> wrote:
> >
> > On Mon, Aug 30, 2021 at 7:07 PM Ke Wu <[email protected]> wrote:
> >>
> >> Hello everyone,
> >>
> >> This is Ke. I am working on enable TestStream support for Samza Runner
> in portable mode and discovers something unexpected.
> >>
> >> In my implementation for Samza Runner, couple of tests are failing with
> errors like
> >>
> >>
> >> java.lang.ClassCastException: java.lang.Integer cannot be cast to [B
> >>
> >> I noticed these tests have the same symptom on Flink Runner as well,
> which are currently excluded:
> >>
> >> https://issues.apache.org/jira/browse/BEAM-12048
> >> https://issues.apache.org/jira/browse/BEAM-12050
> >>
> >>
> >> After some more digging, I realized that it is because the combination
> of following facts:
> >>
> >> TestStream is a primitive transform, therefore, Runners are supposed to
> translate directly, the most intuitive implementation for each runner to do
> is to parse the payload to decode TestStream.Event [1] on the Runner
> process to be handed over to subsequent stages.
> >> When TestStream used with Integers, i.e. VarIntCoder to initialize,
> since VarIntCoder is NOT a registered ModelCoder [2], it will be treated as
> custom coder during conversion to protobuf pipeline [3] and will be
> replaced with byte array coder [4] when runner sends data to SDK worker.
> >> Therefore an error occurs because the decoded TestStream.Event has
> Integer as its value but the remote input receiver is expecting byte array,
> causing java.lang.ClassCastException: java.lang.Integer cannot be cast to [B
> >>
> >>
> >> In addition, I tried to update all these failed tests to use Long
> instead of Integer, and all tests will pass since VarLongCoder is a known
> coder. I do understand that runner process does not have user artifacts
> staged so it can only use coders in  beam model when communicating with SDK
> worker process.
> >>
> >> Couple of questions on this:
> >>
> >> 1. Is it expected that VarIntegerCoder is not a known coder?
> >
> >
> > Yes since no one has worked to make it a well known coder.
>
> The notion of "integer" vs. "long" is also language-specific detail as
> well, so not sure it makes sense as a well-known coder.
>
> > It can be made a well known coder and this would solve the immediate
> problem but not the long term issue of portable TestStream not supporting
> arbitrary types.
>
> +1. Rather than making coder a property of TestStream, I would be in
> favor of the TestStream primitive always producing bytes (basically,
> by definition), and providing a composite that consists of this
> followed by a decoding to give us a typed TestStream.
>
>
> >> 2. Is TestStream always supposed to be translated the payload as raw
> bytes in order that runner process can always send it to SDK worker with
> the default byte array coder and asks SDK worker to decode accordingly?
> >
> >
> > Having the runner treat it always as bytes and not T is likely the best
> solution but isn't necessary.
> >
> >> 3. If Yes to 2), then does it mean, TestStream needs to be translated
> in a completely different way in portable mode from classic mode since in
> classic mode, translator can directly translates the payload to its final
> format.
> >>
> >
> > There are a few ways to fix the current implementation to work for all
> types. One way would be if we required the encoded_element to be the
> "nested" encoding and then ensured that the runner uses a
> WindowedValue<ByteArrayCoder in outer context> and the SDK used
> WindowedValue<T> (note that this isn't WindowedValue<LengthPrefix<T>>) for
> the wire coders. This is quite annoying cause the runner inserts length
> prefixing in a lot of places (effectively every time it sees an unknown
> type) so we would need to special case this and propagate this correction
> through any runner native transforms (e.g. GBK) until the SDK consumes it.
> >
> > Another way would be to ensure that the SDK always uses LengthPrefix<T>
> as the PCollection encoding and the encoded_element format. This would mean
> that the runner can translate it to a T if it so chooses and won't have the
> annoying special case propagation logic. This leaks the length prefixing
> into the SDK at graph construction time which is not what it was meant for.
> >
> > Swapping to use an existing well known type is by far the easiest
> approach as you had discovered and won't impact the correctness of the
> tests.
> >
> >>
> >> Best,
> >> Ke
> >>
> >>
> >> [1]
> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52
> >> [2]
> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65
> >> [3]
> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99
> >> [4]
> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93
>

Reply via email to