Hello everyone,

This is Ke. I am working on enable TestStream support for Samza Runner in 
portable mode and discovers something unexpected. 

In my implementation for Samza Runner, couple of tests are failing with errors 
like

java.lang.ClassCastException: java.lang.Integer cannot be cast to [B

I noticed these tests have the same symptom on Flink Runner as well, which are 
currently excluded:
https://issues.apache.org/jira/browse/BEAM-12048 
<https://issues.apache.org/jira/browse/BEAM-12048> 
https://issues.apache.org/jira/browse/BEAM-12050 
<https://issues.apache.org/jira/browse/BEAM-12050>

After some more digging, I realized that it is because the combination of 
following facts:

TestStream is a primitive transform, therefore, Runners are supposed to 
translate directly, the most intuitive implementation for each runner to do is 
to parse the payload to decode TestStream.Event [1] on the Runner process to be 
handed over to subsequent stages.
When TestStream used with Integers, i.e. VarIntCoder to initialize, since 
VarIntCoder is NOT a registered ModelCoder [2], it will be treated as custom 
coder during conversion to protobuf pipeline [3] and will be replaced with byte 
array coder [4] when runner sends data to SDK worker.
Therefore an error occurs because the decoded TestStream.Event has Integer as 
its value but the remote input receiver is expecting byte array, causing 
java.lang.ClassCastException: java.lang.Integer cannot be cast to [B

In addition, I tried to update all these failed tests to use Long instead of 
Integer, and all tests will pass since VarLongCoder is a known coder. I do 
understand that runner process does not have user artifacts staged so it can 
only use coders in  beam model when communicating with SDK worker process. 

Couple of questions on this:

1. Is it expected that VarIntegerCoder is not a known coder?
2. Is TestStream always supposed to be translated the payload as raw bytes in 
order that runner process can always send it to SDK worker with the default 
byte array coder and asks SDK worker to decode accordingly?
3. If Yes to 2), then does it mean, TestStream needs to be translated in a 
completely different way in portable mode from classic mode since in classic 
mode, translator can directly translates the payload to its final format.

Best,
Ke


[1] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52
 
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52>
 
[2] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65
 
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65>
 
[3] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99
 
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99>
 
[4] 
https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93
 
<https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93>
 

Reply via email to