I agree with Robert, having the TestStream be coder Aware seems very strange since it's already followed by a PCollection node that actually knows the type/coder being used.
Is there any reason TestStream *needs* to be aware of the type it's bytes segments represent? On Tue, Aug 31, 2021, 10:06 AM Robert Bradshaw <[email protected]> wrote: > On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik <[email protected]> wrote: > > > > On Mon, Aug 30, 2021 at 7:07 PM Ke Wu <[email protected]> wrote: > >> > >> Hello everyone, > >> > >> This is Ke. I am working on enable TestStream support for Samza Runner > in portable mode and discovers something unexpected. > >> > >> In my implementation for Samza Runner, couple of tests are failing with > errors like > >> > >> > >> java.lang.ClassCastException: java.lang.Integer cannot be cast to [B > >> > >> I noticed these tests have the same symptom on Flink Runner as well, > which are currently excluded: > >> > >> https://issues.apache.org/jira/browse/BEAM-12048 > >> https://issues.apache.org/jira/browse/BEAM-12050 > >> > >> > >> After some more digging, I realized that it is because the combination > of following facts: > >> > >> TestStream is a primitive transform, therefore, Runners are supposed to > translate directly, the most intuitive implementation for each runner to do > is to parse the payload to decode TestStream.Event [1] on the Runner > process to be handed over to subsequent stages. > >> When TestStream used with Integers, i.e. VarIntCoder to initialize, > since VarIntCoder is NOT a registered ModelCoder [2], it will be treated as > custom coder during conversion to protobuf pipeline [3] and will be > replaced with byte array coder [4] when runner sends data to SDK worker. > >> Therefore an error occurs because the decoded TestStream.Event has > Integer as its value but the remote input receiver is expecting byte array, > causing java.lang.ClassCastException: java.lang.Integer cannot be cast to [B > >> > >> > >> In addition, I tried to update all these failed tests to use Long > instead of Integer, and all tests will pass since VarLongCoder is a known > coder. I do understand that runner process does not have user artifacts > staged so it can only use coders in beam model when communicating with SDK > worker process. > >> > >> Couple of questions on this: > >> > >> 1. Is it expected that VarIntegerCoder is not a known coder? > > > > > > Yes since no one has worked to make it a well known coder. > > The notion of "integer" vs. "long" is also language-specific detail as > well, so not sure it makes sense as a well-known coder. > > > It can be made a well known coder and this would solve the immediate > problem but not the long term issue of portable TestStream not supporting > arbitrary types. > > +1. Rather than making coder a property of TestStream, I would be in > favor of the TestStream primitive always producing bytes (basically, > by definition), and providing a composite that consists of this > followed by a decoding to give us a typed TestStream. > > > >> 2. Is TestStream always supposed to be translated the payload as raw > bytes in order that runner process can always send it to SDK worker with > the default byte array coder and asks SDK worker to decode accordingly? > > > > > > Having the runner treat it always as bytes and not T is likely the best > solution but isn't necessary. > > > >> 3. If Yes to 2), then does it mean, TestStream needs to be translated > in a completely different way in portable mode from classic mode since in > classic mode, translator can directly translates the payload to its final > format. > >> > > > > There are a few ways to fix the current implementation to work for all > types. One way would be if we required the encoded_element to be the > "nested" encoding and then ensured that the runner uses a > WindowedValue<ByteArrayCoder in outer context> and the SDK used > WindowedValue<T> (note that this isn't WindowedValue<LengthPrefix<T>>) for > the wire coders. This is quite annoying cause the runner inserts length > prefixing in a lot of places (effectively every time it sees an unknown > type) so we would need to special case this and propagate this correction > through any runner native transforms (e.g. GBK) until the SDK consumes it. > > > > Another way would be to ensure that the SDK always uses LengthPrefix<T> > as the PCollection encoding and the encoded_element format. This would mean > that the runner can translate it to a T if it so chooses and won't have the > annoying special case propagation logic. This leaks the length prefixing > into the SDK at graph construction time which is not what it was meant for. > > > > Swapping to use an existing well known type is by far the easiest > approach as you had discovered and won't impact the correctness of the > tests. > > > >> > >> Best, > >> Ke > >> > >> > >> [1] > https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52 > >> [2] > https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65 > >> [3] > https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99 > >> [4] > https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93 >
