Specifically, "We have no way of telling from the Runner side, if a length prefix has been used or not." seems false. The runner has all the information since length prefix is a model coder. Didn't we agree that all coders should be self-delimiting in runner/SDK interactions, requiring length-prefix only when there is an opaque or dynamic-length value? I assume you mean that at runtime the worker for a given engine does not know?
Kenn On Tue, Nov 5, 2019 at 3:19 PM Luke Cwik <lc...@google.com> wrote: > +1 to what Robert said. > > On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw <rober...@google.com> > wrote: > >> The Coder used for State/Timers in a StatefulDoFn is pulled out of the >> input PCollection. If a Runner needs to partition by this coder, it >> should ensure the coder of this PCollection matches with the Coder >> used to create the serialized bytes that are used for partitioning >> (whether or not this is length-prefixed). >> >> Concretely, the graph looks like >> >> >> Runner SDK Harness >> >> WriteToGbk >> | >> ReadFromGbk >> | >> RunnerMapFn.processKeyValue(key, value) >> | >> WriteToDataChannel >> ------------------------> >> ReadFromDataChannel >> | >> (pcIn) >> | >> MyStatefulDoFn.process(key, value) >> >> Now the (key part of the) Coder of pcIn, which comes from the proto >> that the Runner sent to the SDK, must match the (key part of the) >> encoding used in WriteToGbk and ReadFromGbk. If a LenthPrefix is added >> in one spot, it must be added in the other. >> >> >> [1] >> https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183 >> >> On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels <m...@apache.org> wrote: >> > >> > Hi, >> > >> > I wanted to get your opinion on something that I have been struggling >> > with. It is about the coders for state requests in portable pipelines. >> > >> > In contrast to "classic" Beam, the Runner is not guaranteed to know >> > which coder is used by the SDK. If the SDK happens to use a standard >> > coder (also known as model coder), we will also have it available at the >> > Runner, i.e. if the Runner is written in one of the SDK languages (e.g. >> > Java). However, when we do not have a standard coder, we just treat the >> > data from the SDK as a blob and just pass it around as bytes. >> > >> > Problem >> > ======= >> > >> > In the case of state requests which the SDK Harness authors to the >> > Runner, we would like for the key associated with the state request to >> > match the key of the element which led to initiating the state request. >> > >> > Example: >> > >> > Runner SDK Harness >> > ------ ----------- >> > >> > KV["key","value"] --> Process Element >> > | >> > LookupState("key") <-- Request state of "key" >> > | >> > State["key"] --> Receive state >> > >> > >> > For stateful DoFns, the Runner partitions the data based on the key. In >> > Flink, this partitioning must not change during the lifetime of a >> > pipeline because the checkpointing otherwise breaks[0]. The key is >> > extracted from the element and stored encoded. >> > >> > If we have a standard coder, it is basically the same as in the >> > "classic" Runner which takes the key and serializes it. However, when we >> > have an SDK-specific coder, we basically do not know how it encodes. So >> > far, we have been using the coder instantiated from the Proto, which is >> > basically a LengthPrefixCoder[ByteArrayCoder] or similar[1]. We have had >> > problems with this because the key encoding of Java SDK state requests >> > did not match the key encoding on the Runner side [2]. In an attempt to >> > fix those, it is now partly broken for portable Python pipelines. >> > Partly, because it "only" affects non-standard coders. >> > >> > Non-standard coders yield the aforementioned >> > LengthPrefixCoder[ByteArrayCoder]. Now, following the usual encoding >> > scheme, we would simply encode the key using this coder. However, for >> > state requests, the Python SDK leaves out the length prefix for certain >> > coders, e.g. for primitives like int or byte. It is possible that one >> > coder uses a length prefix, while another doesn't. We have no way of >> > telling from the Runner side, if a length prefix has been used or not. >> > This results in the keys to not match on the Runner side and the >> > partitioning to be broken. >> > >> > >> > How to solve this? >> > ================== >> > >> > (1) Should this simply be fixed on the Python SDK side? One fix would be >> > to always append a length prefix to the key in state requests, even for >> > primitive coders like VarInt which do not use one. >> > >> > OR >> > >> > (2) Should the Runner detect that a non-standard coder is used? If so, >> > it should just pass the bytes from the SDK Harness and never make an >> > attempt to construct a coder based on the Proto. >> > >> > >> > Thinking about it now, it seems pretty obvious that (2) is the most >> > feasible way to avoid complications across all current and future SDKs >> > for key encodings. Still, it is odd that the Proto contains coder >> > information which is not usable. >> > >> > What do you think? >> > >> > >> > Thanks, >> > Max >> > >> > >> > [0] It is possible to restart the pipeline and repartition the >> > checkpointed data. >> > [1] >> > >> https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682 >> > [2] https://issues.apache.org/jira/browse/BEAM-8157 >> >