+1 to what Robert said. On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw <rober...@google.com> wrote:
> The Coder used for State/Timers in a StatefulDoFn is pulled out of the > input PCollection. If a Runner needs to partition by this coder, it > should ensure the coder of this PCollection matches with the Coder > used to create the serialized bytes that are used for partitioning > (whether or not this is length-prefixed). > > Concretely, the graph looks like > > > Runner SDK Harness > > WriteToGbk > | > ReadFromGbk > | > RunnerMapFn.processKeyValue(key, value) > | > WriteToDataChannel > ------------------------> > ReadFromDataChannel > | > (pcIn) > | > MyStatefulDoFn.process(key, value) > > Now the (key part of the) Coder of pcIn, which comes from the proto > that the Runner sent to the SDK, must match the (key part of the) > encoding used in WriteToGbk and ReadFromGbk. If a LenthPrefix is added > in one spot, it must be added in the other. > > > [1] > https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183 > > On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels <m...@apache.org> wrote: > > > > Hi, > > > > I wanted to get your opinion on something that I have been struggling > > with. It is about the coders for state requests in portable pipelines. > > > > In contrast to "classic" Beam, the Runner is not guaranteed to know > > which coder is used by the SDK. If the SDK happens to use a standard > > coder (also known as model coder), we will also have it available at the > > Runner, i.e. if the Runner is written in one of the SDK languages (e.g. > > Java). However, when we do not have a standard coder, we just treat the > > data from the SDK as a blob and just pass it around as bytes. > > > > Problem > > ======= > > > > In the case of state requests which the SDK Harness authors to the > > Runner, we would like for the key associated with the state request to > > match the key of the element which led to initiating the state request. > > > > Example: > > > > Runner SDK Harness > > ------ ----------- > > > > KV["key","value"] --> Process Element > > | > > LookupState("key") <-- Request state of "key" > > | > > State["key"] --> Receive state > > > > > > For stateful DoFns, the Runner partitions the data based on the key. In > > Flink, this partitioning must not change during the lifetime of a > > pipeline because the checkpointing otherwise breaks[0]. The key is > > extracted from the element and stored encoded. > > > > If we have a standard coder, it is basically the same as in the > > "classic" Runner which takes the key and serializes it. However, when we > > have an SDK-specific coder, we basically do not know how it encodes. So > > far, we have been using the coder instantiated from the Proto, which is > > basically a LengthPrefixCoder[ByteArrayCoder] or similar[1]. We have had > > problems with this because the key encoding of Java SDK state requests > > did not match the key encoding on the Runner side [2]. In an attempt to > > fix those, it is now partly broken for portable Python pipelines. > > Partly, because it "only" affects non-standard coders. > > > > Non-standard coders yield the aforementioned > > LengthPrefixCoder[ByteArrayCoder]. Now, following the usual encoding > > scheme, we would simply encode the key using this coder. However, for > > state requests, the Python SDK leaves out the length prefix for certain > > coders, e.g. for primitives like int or byte. It is possible that one > > coder uses a length prefix, while another doesn't. We have no way of > > telling from the Runner side, if a length prefix has been used or not. > > This results in the keys to not match on the Runner side and the > > partitioning to be broken. > > > > > > How to solve this? > > ================== > > > > (1) Should this simply be fixed on the Python SDK side? One fix would be > > to always append a length prefix to the key in state requests, even for > > primitive coders like VarInt which do not use one. > > > > OR > > > > (2) Should the Runner detect that a non-standard coder is used? If so, > > it should just pass the bytes from the SDK Harness and never make an > > attempt to construct a coder based on the Proto. > > > > > > Thinking about it now, it seems pretty obvious that (2) is the most > > feasible way to avoid complications across all current and future SDKs > > for key encodings. Still, it is odd that the Proto contains coder > > information which is not usable. > > > > What do you think? > > > > > > Thanks, > > Max > > > > > > [0] It is possible to restart the pipeline and repartition the > > checkpointed data. > > [1] > > > https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682 > > [2] https://issues.apache.org/jira/browse/BEAM-8157 >