Awesome! Thank you Luke and Robert. 

Also created https://issues.apache.org/jira/browse/BEAM-12828 
<https://issues.apache.org/jira/browse/BEAM-12828> to track unit test 
conversion. I could take it after I updated Samza runner to support TestStream 
in portable mode.

> On Aug 31, 2021, at 2:05 PM, Robert Bradshaw <[email protected]> wrote:
> 
> Created https://issues.apache.org/jira/browse/BEAM-12827 to track this.
> 
> +1 to converting tests to just use longs for better coverage for now.
> 
> Also, yes, this is very similar to the issues encountered by Reads,
> but the solution is a bit simpler as there's no need for the
> TestStream primitive to interact with the decoded version of the
> elements (unlike Reads, where the sources often give elements in
> un-encoded form) and no user code to run.
> 
> - Robert
> 
> 
> 
> On Tue, Aug 31, 2021 at 11:00 AM Jan Lukavský <[email protected]> wrote:
>> 
>> This looks (and likely has the same cause) similar to what I have 
>> experienced when making primitive Read supported by Flink. The final 
>> solution would be to make SDK coders known to the runner of the same SDK 
>> (already present in various different threads). But until then, the solution 
>> seems to be something like [1]. The root cause is that the executable stage 
>> expects its input to be encoded by the SDK harness, and that part is missing 
>> when the transform is inlined (like Read in my case, or TestStream in your 
>> case). The intoWireTypes method simulates precisely this part - it encodes 
>> the PCollection via coder defined in the SDK harness and then decodes it by 
>> coder defined by the runner (which match on binary level, but produce 
>> different types).
>> 
>> Jan
>> 
>> [1] 
>> https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657
>> 
>> On 8/31/21 7:27 PM, Luke Cwik wrote:
>> 
>> I originally wasn't for making it a composite because it changes the "graph" 
>> structure but the more I thought about it the more I like it.
>> 
>> On Tue, Aug 31, 2021 at 10:06 AM Robert Bradshaw <[email protected]> wrote:
>>> 
>>> On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik <[email protected]> wrote:
>>>> 
>>>> On Mon, Aug 30, 2021 at 7:07 PM Ke Wu <[email protected]> wrote:
>>>>> 
>>>>> Hello everyone,
>>>>> 
>>>>> This is Ke. I am working on enable TestStream support for Samza Runner in 
>>>>> portable mode and discovers something unexpected.
>>>>> 
>>>>> In my implementation for Samza Runner, couple of tests are failing with 
>>>>> errors like
>>>>> 
>>>>> 
>>>>> java.lang.ClassCastException: java.lang.Integer cannot be cast to [B
>>>>> 
>>>>> I noticed these tests have the same symptom on Flink Runner as well, 
>>>>> which are currently excluded:
>>>>> 
>>>>> https://issues.apache.org/jira/browse/BEAM-12048
>>>>> https://issues.apache.org/jira/browse/BEAM-12050
>>>>> 
>>>>> 
>>>>> After some more digging, I realized that it is because the combination of 
>>>>> following facts:
>>>>> 
>>>>> TestStream is a primitive transform, therefore, Runners are supposed to 
>>>>> translate directly, the most intuitive implementation for each runner to 
>>>>> do is to parse the payload to decode TestStream.Event [1] on the Runner 
>>>>> process to be handed over to subsequent stages.
>>>>> When TestStream used with Integers, i.e. VarIntCoder to initialize, since 
>>>>> VarIntCoder is NOT a registered ModelCoder [2], it will be treated as 
>>>>> custom coder during conversion to protobuf pipeline [3] and will be 
>>>>> replaced with byte array coder [4] when runner sends data to SDK worker.
>>>>> Therefore an error occurs because the decoded TestStream.Event has 
>>>>> Integer as its value but the remote input receiver is expecting byte 
>>>>> array, causing java.lang.ClassCastException: java.lang.Integer cannot be 
>>>>> cast to [B
>>>>> 
>>>>> 
>>>>> In addition, I tried to update all these failed tests to use Long instead 
>>>>> of Integer, and all tests will pass since VarLongCoder is a known coder. 
>>>>> I do understand that runner process does not have user artifacts staged 
>>>>> so it can only use coders in  beam model when communicating with SDK 
>>>>> worker process.
>>>>> 
>>>>> Couple of questions on this:
>>>>> 
>>>>> 1. Is it expected that VarIntegerCoder is not a known coder?
>>>> 
>>>> 
>>>> Yes since no one has worked to make it a well known coder.
>>> 
>>> The notion of "integer" vs. "long" is also language-specific detail as
>>> well, so not sure it makes sense as a well-known coder.
>>> 
>>>> It can be made a well known coder and this would solve the immediate 
>>>> problem but not the long term issue of portable TestStream not supporting 
>>>> arbitrary types.
>>> 
>>> +1. Rather than making coder a property of TestStream, I would be in
>>> favor of the TestStream primitive always producing bytes (basically,
>>> by definition), and providing a composite that consists of this
>>> followed by a decoding to give us a typed TestStream.
>>> 
>>> 
>>>>> 2. Is TestStream always supposed to be translated the payload as raw 
>>>>> bytes in order that runner process can always send it to SDK worker with 
>>>>> the default byte array coder and asks SDK worker to decode accordingly?
>>>> 
>>>> 
>>>> Having the runner treat it always as bytes and not T is likely the best 
>>>> solution but isn't necessary.
>>>> 
>>>>> 3. If Yes to 2), then does it mean, TestStream needs to be translated in 
>>>>> a completely different way in portable mode from classic mode since in 
>>>>> classic mode, translator can directly translates the payload to its final 
>>>>> format.
>>>>> 
>>>> 
>>>> There are a few ways to fix the current implementation to work for all 
>>>> types. One way would be if we required the encoded_element to be the 
>>>> "nested" encoding and then ensured that the runner uses a 
>>>> WindowedValue<ByteArrayCoder in outer context> and the SDK used 
>>>> WindowedValue<T> (note that this isn't WindowedValue<LengthPrefix<T>>) for 
>>>> the wire coders. This is quite annoying cause the runner inserts length 
>>>> prefixing in a lot of places (effectively every time it sees an unknown 
>>>> type) so we would need to special case this and propagate this correction 
>>>> through any runner native transforms (e.g. GBK) until the SDK consumes it.
>>>> 
>>>> Another way would be to ensure that the SDK always uses LengthPrefix<T> as 
>>>> the PCollection encoding and the encoded_element format. This would mean 
>>>> that the runner can translate it to a T if it so chooses and won't have 
>>>> the annoying special case propagation logic. This leaks the length 
>>>> prefixing into the SDK at graph construction time which is not what it was 
>>>> meant for.
>>>> 
>>>> Swapping to use an existing well known type is by far the easiest approach 
>>>> as you had discovered and won't impact the correctness of the tests.
>>>> 
>>>>> 
>>>>> Best,
>>>>> Ke
>>>>> 
>>>>> 
>>>>> [1] 
>>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52
>>>>> [2] 
>>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65
>>>>> [3] 
>>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99
>>>>> [4] 
>>>>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93

Reply via email to