Awesome! Thank you Luke and Robert. Also created https://issues.apache.org/jira/browse/BEAM-12828 <https://issues.apache.org/jira/browse/BEAM-12828> to track unit test conversion. I could take it after I updated Samza runner to support TestStream in portable mode.
> On Aug 31, 2021, at 2:05 PM, Robert Bradshaw <[email protected]> wrote: > > Created https://issues.apache.org/jira/browse/BEAM-12827 to track this. > > +1 to converting tests to just use longs for better coverage for now. > > Also, yes, this is very similar to the issues encountered by Reads, > but the solution is a bit simpler as there's no need for the > TestStream primitive to interact with the decoded version of the > elements (unlike Reads, where the sources often give elements in > un-encoded form) and no user code to run. > > - Robert > > > > On Tue, Aug 31, 2021 at 11:00 AM Jan Lukavský <[email protected]> wrote: >> >> This looks (and likely has the same cause) similar to what I have >> experienced when making primitive Read supported by Flink. The final >> solution would be to make SDK coders known to the runner of the same SDK >> (already present in various different threads). But until then, the solution >> seems to be something like [1]. The root cause is that the executable stage >> expects its input to be encoded by the SDK harness, and that part is missing >> when the transform is inlined (like Read in my case, or TestStream in your >> case). The intoWireTypes method simulates precisely this part - it encodes >> the PCollection via coder defined in the SDK harness and then decodes it by >> coder defined by the runner (which match on binary level, but produce >> different types). >> >> Jan >> >> [1] >> https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657 >> >> On 8/31/21 7:27 PM, Luke Cwik wrote: >> >> I originally wasn't for making it a composite because it changes the "graph" >> structure but the more I thought about it the more I like it. >> >> On Tue, Aug 31, 2021 at 10:06 AM Robert Bradshaw <[email protected]> wrote: >>> >>> On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik <[email protected]> wrote: >>>> >>>> On Mon, Aug 30, 2021 at 7:07 PM Ke Wu <[email protected]> wrote: >>>>> >>>>> Hello everyone, >>>>> >>>>> This is Ke. I am working on enable TestStream support for Samza Runner in >>>>> portable mode and discovers something unexpected. >>>>> >>>>> In my implementation for Samza Runner, couple of tests are failing with >>>>> errors like >>>>> >>>>> >>>>> java.lang.ClassCastException: java.lang.Integer cannot be cast to [B >>>>> >>>>> I noticed these tests have the same symptom on Flink Runner as well, >>>>> which are currently excluded: >>>>> >>>>> https://issues.apache.org/jira/browse/BEAM-12048 >>>>> https://issues.apache.org/jira/browse/BEAM-12050 >>>>> >>>>> >>>>> After some more digging, I realized that it is because the combination of >>>>> following facts: >>>>> >>>>> TestStream is a primitive transform, therefore, Runners are supposed to >>>>> translate directly, the most intuitive implementation for each runner to >>>>> do is to parse the payload to decode TestStream.Event [1] on the Runner >>>>> process to be handed over to subsequent stages. >>>>> When TestStream used with Integers, i.e. VarIntCoder to initialize, since >>>>> VarIntCoder is NOT a registered ModelCoder [2], it will be treated as >>>>> custom coder during conversion to protobuf pipeline [3] and will be >>>>> replaced with byte array coder [4] when runner sends data to SDK worker. >>>>> Therefore an error occurs because the decoded TestStream.Event has >>>>> Integer as its value but the remote input receiver is expecting byte >>>>> array, causing java.lang.ClassCastException: java.lang.Integer cannot be >>>>> cast to [B >>>>> >>>>> >>>>> In addition, I tried to update all these failed tests to use Long instead >>>>> of Integer, and all tests will pass since VarLongCoder is a known coder. >>>>> I do understand that runner process does not have user artifacts staged >>>>> so it can only use coders in beam model when communicating with SDK >>>>> worker process. >>>>> >>>>> Couple of questions on this: >>>>> >>>>> 1. Is it expected that VarIntegerCoder is not a known coder? >>>> >>>> >>>> Yes since no one has worked to make it a well known coder. >>> >>> The notion of "integer" vs. "long" is also language-specific detail as >>> well, so not sure it makes sense as a well-known coder. >>> >>>> It can be made a well known coder and this would solve the immediate >>>> problem but not the long term issue of portable TestStream not supporting >>>> arbitrary types. >>> >>> +1. Rather than making coder a property of TestStream, I would be in >>> favor of the TestStream primitive always producing bytes (basically, >>> by definition), and providing a composite that consists of this >>> followed by a decoding to give us a typed TestStream. >>> >>> >>>>> 2. Is TestStream always supposed to be translated the payload as raw >>>>> bytes in order that runner process can always send it to SDK worker with >>>>> the default byte array coder and asks SDK worker to decode accordingly? >>>> >>>> >>>> Having the runner treat it always as bytes and not T is likely the best >>>> solution but isn't necessary. >>>> >>>>> 3. If Yes to 2), then does it mean, TestStream needs to be translated in >>>>> a completely different way in portable mode from classic mode since in >>>>> classic mode, translator can directly translates the payload to its final >>>>> format. >>>>> >>>> >>>> There are a few ways to fix the current implementation to work for all >>>> types. One way would be if we required the encoded_element to be the >>>> "nested" encoding and then ensured that the runner uses a >>>> WindowedValue<ByteArrayCoder in outer context> and the SDK used >>>> WindowedValue<T> (note that this isn't WindowedValue<LengthPrefix<T>>) for >>>> the wire coders. This is quite annoying cause the runner inserts length >>>> prefixing in a lot of places (effectively every time it sees an unknown >>>> type) so we would need to special case this and propagate this correction >>>> through any runner native transforms (e.g. GBK) until the SDK consumes it. >>>> >>>> Another way would be to ensure that the SDK always uses LengthPrefix<T> as >>>> the PCollection encoding and the encoded_element format. This would mean >>>> that the runner can translate it to a T if it so chooses and won't have >>>> the annoying special case propagation logic. This leaks the length >>>> prefixing into the SDK at graph construction time which is not what it was >>>> meant for. >>>> >>>> Swapping to use an existing well known type is by far the easiest approach >>>> as you had discovered and won't impact the correctness of the tests. >>>> >>>>> >>>>> Best, >>>>> Ke >>>>> >>>>> >>>>> [1] >>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52 >>>>> [2] >>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65 >>>>> [3] >>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99 >>>>> [4] >>>>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93
