> Read does not have translation in portability, so the implementation is that > it needs to be primitive transform explicitly implemented by the runner. The > encoding/decoding has to happen in the runner.
Could you help me understand this a bit more? IIRC, Read is NOT being translated in portable mode exactly means it is a composite transform instead of primitive because all primitive transforms are required to be translated. In addition, Read is a composite transform of Impulse, which produces dummy bytes [1] to trigger subsequent ParDo/ExecutableStage, where decoding the actual source happens [2] > There seems to be no role of the SDK harness with regard to the TestStream, > because the elements are already encoded by the submitting SDK. The coders > must match nevertheless, because you can have Events of type > KV<KV<WindowedValue<Integer, Object>>> and what will and what will not get > length-prefixed depends on which parts exactly are "known" (model) coders and > which are not. Encoding the whole value as single byte array will not work > for the consuming SDK harness, which will see that there should be nested > KvCoders instead. I don’t think I fully understand what you say here. TestStream is currently a primitive transform, therefore there is no role of SDK harness. This is what the proposal to change, to make TestStream a composite transform with a primitive transform and subsequent ParDo to decode to the desired format. [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Impulse.java#L39 <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Impulse.java#L39> [2] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L149 <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L149> > On Aug 31, 2021, at 3:21 PM, Jan Lukavský <[email protected]> wrote: > > On 9/1/21 12:13 AM, Ke Wu wrote: >> Hi Jan, >> >> Here is my understanding, >> >> Runner is being brought up by job server driver, which is up and running >> before the job submission, i.e. it is job agnostic. Therefore, the runner it >> brought up does not have any SDK coder available and artifact staging only >> happens for SDK workers. >> >> You are right that Read and TestStream are sources, however the one thing >> that distinguish them is that Read transform is a composite transform and >> the decoding happens in ParDo/ExecutableStage, i.e. on SDK worker. > Read does not have translation in portability, so the implementation is that > it needs to be primitive transform explicitly implemented by the runner. The > encoding/decoding has to happen in the runner. >> >> The proposal here is also to make the public facing TestStream transform a >> composite transform instead of primitive now, so that the decoding would >> occur on the SDK worker side where SDK coder is available, and the primitive >> that powers TestStream, which will be directly translated by runner to >> always produce raw bytes, and these raw bytes will be decoded on the SDK >> worker side. > There seems to be no role of the SDK harness with regard to the TestStream, > because the elements are already encoded by the submitting SDK. The coders > must match nevertheless, because you can have Events of type > KV<KV<WindowedValue<Integer, Object>>> and what will and what will not get > length-prefixed depends on which parts exactly are "known" (model) coders and > which are not. Encoding the whole value as single byte array will not work > for the consuming SDK harness, which will see that there should be nested > KvCoders instead. >> >> Best, >> Ke >> >>> On Aug 31, 2021, at 2:56 PM, Jan Lukavský <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> Sorry if I'm missing something obvious, but I don't quite see the >>> difference between Read and TestStream regarding the discussed issue with >>> coders. Couple of thoughts: >>> >>> a) both Read and TestStream are _sources_ - they produce elements that are >>> consumed by downstream transforms >>> >>> b) the coder of a particular PCollection is defined by the Pipeline proto >>> - it is the (client side) SDK that owns the Pipeline and that defines all >>> the coders >>> >>> c) runners must adhere to these coders, because otherwise there is risk of >>> coder mismatch, most probably on edges like x-lang transforms or inlined >>> transforms >>> >>> I tried the approach of encoding the output of Read into byte array as >>> well, but that turns out to have the problem that once there is a >>> (partially) known coder in play, this does not work, because the consuming >>> transform (executable stage) expects to see the wire coder - that is not >>> simply byte array, because the type of elements might be for instance KV<K, >>> V>, where KvCoder is one of ModelCoders. That does not encode using >>> LengthPrefixCoder and as such will be incompatible with >>> LengthPrefixCoder(ByteArrayCoder). The TestStream needs to know the coder >>> of elements, because that defines where exactly must or must not be >>> inserted length-prefixing. The logic in LengthPrefixUnknownCoders [1] is >>> recursive for ModelCoders. >>> >>> [1] >>> https://github.com/apache/beam/blob/ff70e740a2155592dfcb302ff6303cc19660a268/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48 >>> >>> <https://github.com/apache/beam/blob/ff70e740a2155592dfcb302ff6303cc19660a268/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48> >>> On 8/31/21 11:29 PM, Ke Wu wrote: >>>> Awesome! Thank you Luke and Robert. >>>> >>>> Also created https://issues.apache.org/jira/browse/BEAM-12828 >>>> <https://issues.apache.org/jira/browse/BEAM-12828> to track unit test >>>> conversion. I could take it after I updated Samza runner to support >>>> TestStream in portable mode. >>>> >>>>> On Aug 31, 2021, at 2:05 PM, Robert Bradshaw <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>> >>>>> Created https://issues.apache.org/jira/browse/BEAM-12827 >>>>> <https://issues.apache.org/jira/browse/BEAM-12827> to track this. >>>>> >>>>> +1 to converting tests to just use longs for better coverage for now. >>>>> >>>>> Also, yes, this is very similar to the issues encountered by Reads, >>>>> but the solution is a bit simpler as there's no need for the >>>>> TestStream primitive to interact with the decoded version of the >>>>> elements (unlike Reads, where the sources often give elements in >>>>> un-encoded form) and no user code to run. >>>>> >>>>> - Robert >>>>> >>>>> >>>>> >>>>> On Tue, Aug 31, 2021 at 11:00 AM Jan Lukavský <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>>> >>>>>> This looks (and likely has the same cause) similar to what I have >>>>>> experienced when making primitive Read supported by Flink. The final >>>>>> solution would be to make SDK coders known to the runner of the same SDK >>>>>> (already present in various different threads). But until then, the >>>>>> solution seems to be something like [1]. The root cause is that the >>>>>> executable stage expects its input to be encoded by the SDK harness, and >>>>>> that part is missing when the transform is inlined (like Read in my >>>>>> case, or TestStream in your case). The intoWireTypes method simulates >>>>>> precisely this part - it encodes the PCollection via coder defined in >>>>>> the SDK harness and then decodes it by coder defined by the runner >>>>>> (which match on binary level, but produce different types). >>>>>> >>>>>> Jan >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657 >>>>>> >>>>>> <https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657> >>>>>> >>>>>> On 8/31/21 7:27 PM, Luke Cwik wrote: >>>>>> >>>>>> I originally wasn't for making it a composite because it changes the >>>>>> "graph" structure but the more I thought about it the more I like it. >>>>>> >>>>>> On Tue, Aug 31, 2021 at 10:06 AM Robert Bradshaw <[email protected] >>>>>> <mailto:[email protected]>> wrote: >>>>>>> >>>>>>> On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik <[email protected] >>>>>>> <mailto:[email protected]>> wrote: >>>>>>>> >>>>>>>> On Mon, Aug 30, 2021 at 7:07 PM Ke Wu <[email protected] >>>>>>>> <mailto:[email protected]>> wrote: >>>>>>>>> >>>>>>>>> Hello everyone, >>>>>>>>> >>>>>>>>> This is Ke. I am working on enable TestStream support for Samza >>>>>>>>> Runner in portable mode and discovers something unexpected. >>>>>>>>> >>>>>>>>> In my implementation for Samza Runner, couple of tests are failing >>>>>>>>> with errors like >>>>>>>>> >>>>>>>>> >>>>>>>>> java.lang.ClassCastException: java.lang.Integer cannot be cast to [B >>>>>>>>> >>>>>>>>> I noticed these tests have the same symptom on Flink Runner as well, >>>>>>>>> which are currently excluded: >>>>>>>>> >>>>>>>>> https://issues.apache.org/jira/browse/BEAM-12048 >>>>>>>>> <https://issues.apache.org/jira/browse/BEAM-12048> >>>>>>>>> https://issues.apache.org/jira/browse/BEAM-12050 >>>>>>>>> <https://issues.apache.org/jira/browse/BEAM-12050> >>>>>>>>> >>>>>>>>> >>>>>>>>> After some more digging, I realized that it is because the >>>>>>>>> combination of following facts: >>>>>>>>> >>>>>>>>> TestStream is a primitive transform, therefore, Runners are supposed >>>>>>>>> to translate directly, the most intuitive implementation for each >>>>>>>>> runner to do is to parse the payload to decode TestStream.Event [1] >>>>>>>>> on the Runner process to be handed over to subsequent stages. >>>>>>>>> When TestStream used with Integers, i.e. VarIntCoder to initialize, >>>>>>>>> since VarIntCoder is NOT a registered ModelCoder [2], it will be >>>>>>>>> treated as custom coder during conversion to protobuf pipeline [3] >>>>>>>>> and will be replaced with byte array coder [4] when runner sends data >>>>>>>>> to SDK worker. >>>>>>>>> Therefore an error occurs because the decoded TestStream.Event has >>>>>>>>> Integer as its value but the remote input receiver is expecting byte >>>>>>>>> array, causing java.lang.ClassCastException: java.lang.Integer cannot >>>>>>>>> be cast to [B >>>>>>>>> >>>>>>>>> >>>>>>>>> In addition, I tried to update all these failed tests to use Long >>>>>>>>> instead of Integer, and all tests will pass since VarLongCoder is a >>>>>>>>> known coder. I do understand that runner process does not have user >>>>>>>>> artifacts staged so it can only use coders in beam model when >>>>>>>>> communicating with SDK worker process. >>>>>>>>> >>>>>>>>> Couple of questions on this: >>>>>>>>> >>>>>>>>> 1. Is it expected that VarIntegerCoder is not a known coder? >>>>>>>> >>>>>>>> >>>>>>>> Yes since no one has worked to make it a well known coder. >>>>>>> >>>>>>> The notion of "integer" vs. "long" is also language-specific detail as >>>>>>> well, so not sure it makes sense as a well-known coder. >>>>>>> >>>>>>>> It can be made a well known coder and this would solve the immediate >>>>>>>> problem but not the long term issue of portable TestStream not >>>>>>>> supporting arbitrary types. >>>>>>> >>>>>>> +1. Rather than making coder a property of TestStream, I would be in >>>>>>> favor of the TestStream primitive always producing bytes (basically, >>>>>>> by definition), and providing a composite that consists of this >>>>>>> followed by a decoding to give us a typed TestStream. >>>>>>> >>>>>>> >>>>>>>>> 2. Is TestStream always supposed to be translated the payload as raw >>>>>>>>> bytes in order that runner process can always send it to SDK worker >>>>>>>>> with the default byte array coder and asks SDK worker to decode >>>>>>>>> accordingly? >>>>>>>> >>>>>>>> >>>>>>>> Having the runner treat it always as bytes and not T is likely the >>>>>>>> best solution but isn't necessary. >>>>>>>> >>>>>>>>> 3. If Yes to 2), then does it mean, TestStream needs to be translated >>>>>>>>> in a completely different way in portable mode from classic mode >>>>>>>>> since in classic mode, translator can directly translates the payload >>>>>>>>> to its final format. >>>>>>>>> >>>>>>>> >>>>>>>> There are a few ways to fix the current implementation to work for all >>>>>>>> types. One way would be if we required the encoded_element to be the >>>>>>>> "nested" encoding and then ensured that the runner uses a >>>>>>>> WindowedValue<ByteArrayCoder in outer context> and the SDK used >>>>>>>> WindowedValue<T> (note that this isn't WindowedValue<LengthPrefix<T>>) >>>>>>>> for the wire coders. This is quite annoying cause the runner inserts >>>>>>>> length prefixing in a lot of places (effectively every time it sees an >>>>>>>> unknown type) so we would need to special case this and propagate this >>>>>>>> correction through any runner native transforms (e.g. GBK) until the >>>>>>>> SDK consumes it. >>>>>>>> >>>>>>>> Another way would be to ensure that the SDK always uses >>>>>>>> LengthPrefix<T> as the PCollection encoding and the encoded_element >>>>>>>> format. This would mean that the runner can translate it to a T if it >>>>>>>> so chooses and won't have the annoying special case propagation logic. >>>>>>>> This leaks the length prefixing into the SDK at graph construction >>>>>>>> time which is not what it was meant for. >>>>>>>> >>>>>>>> Swapping to use an existing well known type is by far the easiest >>>>>>>> approach as you had discovered and won't impact the correctness of the >>>>>>>> tests. >>>>>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Ke >>>>>>>>> >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52 >>>>>>>>> >>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52> >>>>>>>>> [2] >>>>>>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65 >>>>>>>>> >>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65> >>>>>>>>> [3] >>>>>>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99 >>>>>>>>> >>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99> >>>>>>>>> [4] >>>>>>>>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93 >>>>>>>>> >>>>>>>>> <https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93> >>>> >>
