> Read does not have translation in portability, so the implementation is that 
> it needs to be primitive transform explicitly implemented by the runner. The 
> encoding/decoding has to happen in the runner.

Could you help me understand this a bit more? IIRC, Read is NOT being 
translated in portable mode exactly means it is a composite transform instead 
of primitive because all primitive transforms are required to be translated. In 
addition, Read is a composite transform of Impulse, which produces dummy bytes 
[1] to trigger subsequent ParDo/ExecutableStage, where decoding the actual 
source happens [2]

> There seems to be no role of the SDK harness with regard to the TestStream, 
> because the elements are already encoded by the submitting SDK. The coders 
> must match nevertheless, because you can have Events of type 
> KV<KV<WindowedValue<Integer, Object>>> and what will and what will not get 
> length-prefixed depends on which parts exactly are "known" (model) coders and 
> which are not. Encoding the whole value as single byte array will not work 
> for the consuming SDK harness, which will see that there should be nested 
> KvCoders instead.


I don’t think I fully understand what you say here. TestStream is currently a 
primitive transform, therefore there is no role of SDK harness. This is what 
the proposal to change, to make TestStream a composite transform with a 
primitive transform and subsequent ParDo to decode to the desired format. 


[1] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Impulse.java#L39
 
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Impulse.java#L39>
 
[2] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L149
 
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L149>
 

> On Aug 31, 2021, at 3:21 PM, Jan Lukavský <[email protected]> wrote:
> 
> On 9/1/21 12:13 AM, Ke Wu wrote:
>> Hi Jan,
>> 
>> Here is my understanding,
>> 
>> Runner is being brought up by job server driver, which is up and running 
>> before the job submission, i.e. it is job agnostic. Therefore, the runner it 
>> brought up does not have any SDK coder available and artifact staging only 
>> happens for SDK workers. 
>> 
>> You are right that Read and TestStream are sources, however the one thing 
>> that distinguish them is that Read transform is a composite transform and 
>> the decoding happens in ParDo/ExecutableStage, i.e. on SDK worker. 
> Read does not have translation in portability, so the implementation is that 
> it needs to be primitive transform explicitly implemented by the runner. The 
> encoding/decoding has to happen in the runner.
>> 
>> The proposal here is also to make the public facing TestStream transform a 
>> composite transform instead of primitive now, so that the decoding would 
>> occur on the SDK worker side where SDK coder is available, and the primitive 
>> that powers TestStream, which will be directly translated by runner to 
>> always produce raw bytes, and these raw bytes will be decoded on the SDK 
>> worker side.
> There seems to be no role of the SDK harness with regard to the TestStream, 
> because the elements are already encoded by the submitting SDK. The coders 
> must match nevertheless, because you can have Events of type 
> KV<KV<WindowedValue<Integer, Object>>> and what will and what will not get 
> length-prefixed depends on which parts exactly are "known" (model) coders and 
> which are not. Encoding the whole value as single byte array will not work 
> for the consuming SDK harness, which will see that there should be nested 
> KvCoders instead.
>> 
>> Best,
>> Ke
>> 
>>> On Aug 31, 2021, at 2:56 PM, Jan Lukavský <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> Sorry if I'm missing something obvious, but I don't quite see the 
>>> difference between Read and TestStream regarding the discussed issue with 
>>> coders. Couple of thoughts:
>>> 
>>>  a) both Read and TestStream are _sources_ - they produce elements that are 
>>> consumed by downstream transforms
>>> 
>>>  b) the coder of a particular PCollection is defined by the Pipeline proto 
>>> - it is the (client side) SDK that owns the Pipeline and that defines all 
>>> the coders
>>> 
>>>  c) runners must adhere to these coders, because otherwise there is risk of 
>>> coder mismatch, most probably on edges like x-lang transforms or inlined 
>>> transforms
>>> 
>>> I tried the approach of encoding the output of Read into byte array as 
>>> well, but that turns out to have the problem that once there is a 
>>> (partially) known coder in play, this does not work, because the consuming 
>>> transform (executable stage) expects to see the wire coder - that is not 
>>> simply byte array, because the type of elements might be for instance KV<K, 
>>> V>, where KvCoder is one of ModelCoders. That does not encode using 
>>> LengthPrefixCoder and as such will be incompatible with 
>>> LengthPrefixCoder(ByteArrayCoder). The TestStream needs to know the coder 
>>> of elements, because that defines where exactly must or must not be 
>>> inserted length-prefixing. The logic in LengthPrefixUnknownCoders [1] is 
>>> recursive for ModelCoders.
>>> 
>>> [1] 
>>> https://github.com/apache/beam/blob/ff70e740a2155592dfcb302ff6303cc19660a268/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48
>>>  
>>> <https://github.com/apache/beam/blob/ff70e740a2155592dfcb302ff6303cc19660a268/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48>
>>> On 8/31/21 11:29 PM, Ke Wu wrote:
>>>> Awesome! Thank you Luke and Robert. 
>>>> 
>>>> Also created https://issues.apache.org/jira/browse/BEAM-12828 
>>>> <https://issues.apache.org/jira/browse/BEAM-12828> to track unit test 
>>>> conversion. I could take it after I updated Samza runner to support 
>>>> TestStream in portable mode.
>>>> 
>>>>> On Aug 31, 2021, at 2:05 PM, Robert Bradshaw <[email protected] 
>>>>> <mailto:[email protected]>> wrote:
>>>>> 
>>>>> Created https://issues.apache.org/jira/browse/BEAM-12827 
>>>>> <https://issues.apache.org/jira/browse/BEAM-12827> to track this.
>>>>> 
>>>>> +1 to converting tests to just use longs for better coverage for now.
>>>>> 
>>>>> Also, yes, this is very similar to the issues encountered by Reads,
>>>>> but the solution is a bit simpler as there's no need for the
>>>>> TestStream primitive to interact with the decoded version of the
>>>>> elements (unlike Reads, where the sources often give elements in
>>>>> un-encoded form) and no user code to run.
>>>>> 
>>>>> - Robert
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Aug 31, 2021 at 11:00 AM Jan Lukavský <[email protected] 
>>>>> <mailto:[email protected]>> wrote:
>>>>>> 
>>>>>> This looks (and likely has the same cause) similar to what I have 
>>>>>> experienced when making primitive Read supported by Flink. The final 
>>>>>> solution would be to make SDK coders known to the runner of the same SDK 
>>>>>> (already present in various different threads). But until then, the 
>>>>>> solution seems to be something like [1]. The root cause is that the 
>>>>>> executable stage expects its input to be encoded by the SDK harness, and 
>>>>>> that part is missing when the transform is inlined (like Read in my 
>>>>>> case, or TestStream in your case). The intoWireTypes method simulates 
>>>>>> precisely this part - it encodes the PCollection via coder defined in 
>>>>>> the SDK harness and then decodes it by coder defined by the runner 
>>>>>> (which match on binary level, but produce different types).
>>>>>> 
>>>>>> Jan
>>>>>> 
>>>>>> [1] 
>>>>>> https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657
>>>>>>  
>>>>>> <https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657>
>>>>>> 
>>>>>> On 8/31/21 7:27 PM, Luke Cwik wrote:
>>>>>> 
>>>>>> I originally wasn't for making it a composite because it changes the 
>>>>>> "graph" structure but the more I thought about it the more I like it.
>>>>>> 
>>>>>> On Tue, Aug 31, 2021 at 10:06 AM Robert Bradshaw <[email protected] 
>>>>>> <mailto:[email protected]>> wrote:
>>>>>>> 
>>>>>>> On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik <[email protected] 
>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>> 
>>>>>>>> On Mon, Aug 30, 2021 at 7:07 PM Ke Wu <[email protected] 
>>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>>> 
>>>>>>>>> Hello everyone,
>>>>>>>>> 
>>>>>>>>> This is Ke. I am working on enable TestStream support for Samza 
>>>>>>>>> Runner in portable mode and discovers something unexpected.
>>>>>>>>> 
>>>>>>>>> In my implementation for Samza Runner, couple of tests are failing 
>>>>>>>>> with errors like
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> java.lang.ClassCastException: java.lang.Integer cannot be cast to [B
>>>>>>>>> 
>>>>>>>>> I noticed these tests have the same symptom on Flink Runner as well, 
>>>>>>>>> which are currently excluded:
>>>>>>>>> 
>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-12048 
>>>>>>>>> <https://issues.apache.org/jira/browse/BEAM-12048>
>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-12050 
>>>>>>>>> <https://issues.apache.org/jira/browse/BEAM-12050>
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> After some more digging, I realized that it is because the 
>>>>>>>>> combination of following facts:
>>>>>>>>> 
>>>>>>>>> TestStream is a primitive transform, therefore, Runners are supposed 
>>>>>>>>> to translate directly, the most intuitive implementation for each 
>>>>>>>>> runner to do is to parse the payload to decode TestStream.Event [1] 
>>>>>>>>> on the Runner process to be handed over to subsequent stages.
>>>>>>>>> When TestStream used with Integers, i.e. VarIntCoder to initialize, 
>>>>>>>>> since VarIntCoder is NOT a registered ModelCoder [2], it will be 
>>>>>>>>> treated as custom coder during conversion to protobuf pipeline [3] 
>>>>>>>>> and will be replaced with byte array coder [4] when runner sends data 
>>>>>>>>> to SDK worker.
>>>>>>>>> Therefore an error occurs because the decoded TestStream.Event has 
>>>>>>>>> Integer as its value but the remote input receiver is expecting byte 
>>>>>>>>> array, causing java.lang.ClassCastException: java.lang.Integer cannot 
>>>>>>>>> be cast to [B
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> In addition, I tried to update all these failed tests to use Long 
>>>>>>>>> instead of Integer, and all tests will pass since VarLongCoder is a 
>>>>>>>>> known coder. I do understand that runner process does not have user 
>>>>>>>>> artifacts staged so it can only use coders in  beam model when 
>>>>>>>>> communicating with SDK worker process.
>>>>>>>>> 
>>>>>>>>> Couple of questions on this:
>>>>>>>>> 
>>>>>>>>> 1. Is it expected that VarIntegerCoder is not a known coder?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Yes since no one has worked to make it a well known coder.
>>>>>>> 
>>>>>>> The notion of "integer" vs. "long" is also language-specific detail as
>>>>>>> well, so not sure it makes sense as a well-known coder.
>>>>>>> 
>>>>>>>> It can be made a well known coder and this would solve the immediate 
>>>>>>>> problem but not the long term issue of portable TestStream not 
>>>>>>>> supporting arbitrary types.
>>>>>>> 
>>>>>>> +1. Rather than making coder a property of TestStream, I would be in
>>>>>>> favor of the TestStream primitive always producing bytes (basically,
>>>>>>> by definition), and providing a composite that consists of this
>>>>>>> followed by a decoding to give us a typed TestStream.
>>>>>>> 
>>>>>>> 
>>>>>>>>> 2. Is TestStream always supposed to be translated the payload as raw 
>>>>>>>>> bytes in order that runner process can always send it to SDK worker 
>>>>>>>>> with the default byte array coder and asks SDK worker to decode 
>>>>>>>>> accordingly?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Having the runner treat it always as bytes and not T is likely the 
>>>>>>>> best solution but isn't necessary.
>>>>>>>> 
>>>>>>>>> 3. If Yes to 2), then does it mean, TestStream needs to be translated 
>>>>>>>>> in a completely different way in portable mode from classic mode 
>>>>>>>>> since in classic mode, translator can directly translates the payload 
>>>>>>>>> to its final format.
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> There are a few ways to fix the current implementation to work for all 
>>>>>>>> types. One way would be if we required the encoded_element to be the 
>>>>>>>> "nested" encoding and then ensured that the runner uses a 
>>>>>>>> WindowedValue<ByteArrayCoder in outer context> and the SDK used 
>>>>>>>> WindowedValue<T> (note that this isn't WindowedValue<LengthPrefix<T>>) 
>>>>>>>> for the wire coders. This is quite annoying cause the runner inserts 
>>>>>>>> length prefixing in a lot of places (effectively every time it sees an 
>>>>>>>> unknown type) so we would need to special case this and propagate this 
>>>>>>>> correction through any runner native transforms (e.g. GBK) until the 
>>>>>>>> SDK consumes it.
>>>>>>>> 
>>>>>>>> Another way would be to ensure that the SDK always uses 
>>>>>>>> LengthPrefix<T> as the PCollection encoding and the encoded_element 
>>>>>>>> format. This would mean that the runner can translate it to a T if it 
>>>>>>>> so chooses and won't have the annoying special case propagation logic. 
>>>>>>>> This leaks the length prefixing into the SDK at graph construction 
>>>>>>>> time which is not what it was meant for.
>>>>>>>> 
>>>>>>>> Swapping to use an existing well known type is by far the easiest 
>>>>>>>> approach as you had discovered and won't impact the correctness of the 
>>>>>>>> tests.
>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Best,
>>>>>>>>> Ke
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> [1] 
>>>>>>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52
>>>>>>>>>  
>>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52>
>>>>>>>>> [2] 
>>>>>>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65
>>>>>>>>>  
>>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65>
>>>>>>>>> [3] 
>>>>>>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99
>>>>>>>>>  
>>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99>
>>>>>>>>> [4] 
>>>>>>>>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93
>>>>>>>>>  
>>>>>>>>> <https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93>
>>>> 
>> 

Reply via email to