Hello everyone, This is Ke. I am working on enable TestStream support for Samza Runner in portable mode and discovers something unexpected.
In my implementation for Samza Runner, couple of tests are failing with errors like java.lang.ClassCastException: java.lang.Integer cannot be cast to [B I noticed these tests have the same symptom on Flink Runner as well, which are currently excluded: https://issues.apache.org/jira/browse/BEAM-12048 <https://issues.apache.org/jira/browse/BEAM-12048> https://issues.apache.org/jira/browse/BEAM-12050 <https://issues.apache.org/jira/browse/BEAM-12050> After some more digging, I realized that it is because the combination of following facts: TestStream is a primitive transform, therefore, Runners are supposed to translate directly, the most intuitive implementation for each runner to do is to parse the payload to decode TestStream.Event [1] on the Runner process to be handed over to subsequent stages. When TestStream used with Integers, i.e. VarIntCoder to initialize, since VarIntCoder is NOT a registered ModelCoder [2], it will be treated as custom coder during conversion to protobuf pipeline [3] and will be replaced with byte array coder [4] when runner sends data to SDK worker. Therefore an error occurs because the decoded TestStream.Event has Integer as its value but the remote input receiver is expecting byte array, causing java.lang.ClassCastException: java.lang.Integer cannot be cast to [B In addition, I tried to update all these failed tests to use Long instead of Integer, and all tests will pass since VarLongCoder is a known coder. I do understand that runner process does not have user artifacts staged so it can only use coders in beam model when communicating with SDK worker process. Couple of questions on this: 1. Is it expected that VarIntegerCoder is not a known coder? 2. Is TestStream always supposed to be translated the payload as raw bytes in order that runner process can always send it to SDK worker with the default byte array coder and asks SDK worker to decode accordingly? 3. If Yes to 2), then does it mean, TestStream needs to be translated in a completely different way in portable mode from classic mode since in classic mode, translator can directly translates the payload to its final format. Best, Ke [1] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52 <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52> [2] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65 <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65> [3] https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99 <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99> [4] https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93 <https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93>
