On Wed, Nov 6, 2019 at 2:55 AM Maximilian Michels <m...@apache.org> wrote: > > Let me try to clarify: > > > The Coder used for State/Timers in a StatefulDoFn is pulled out of the > > input PCollection. If a Runner needs to partition by this coder, it > > should ensure the coder of this PCollection matches with the Coder > > used to create the serialized bytes that are used for partitioning > > (whether or not this is length-prefixed). > > That is essentially what I had assumed when I wrote the code. The > problem is the coder can be "pulled out" in different ways. > > For example, let's say we have the following Proto PCollection coder > with non-standard coder "CustomCoder" as the key coder: > > KvCoder<CustomCoder, VarIntCoder> > > From the Runner side, this currently looks like the following: > > PCol: KvCoder<LengthPrefixCoder<ByteArrayCoder>, VarIntCoder> > Key: LengthPrefixCoder<ByteArrayCoder>
This is I think where the error is. When If the proto references KvCoder<CustomCoder, VarIntCoder> it should not be pulled out as KvCoder<LengthPrefixCoder<ByteArrayCoder>, VarIntCoder>; as that doesn't have the same encoding. Trying to do instantiate such a coder should be an error. Instead, the runner knows ahead of time that it will need to instantiate this coder, and should update the bundle processor to specify KvCoder<LengthPrefixCoder<CustomCoder>, VarIntCoder> as the coder so both can pull it out in a consistent way. When the coder is KvCoder<LengthPrefixCoder<CustomCoder>, VarIntCoder> instantiating it as KvCoder<ByteArrayCoder, VarIntCoder> on the runner is of course OK as they do have the same encoding. > At the SDK Harness, we have the coder available: > > PCol: KvCoder<CustomCoder, VarIntCoder> > Key: CustomCoder > > Currently, when the SDK Harness serializes a key for a state request, > the custom coder may happen to add a length prefix, or it may not. It > depends on the coder used. The correct behavior would be to use the same > representation as on the Runner side. > > > Specifically, "We have no way of telling from the Runner side, if a length > > prefix has been used or not." seems false > > The Runner cannot inspect an unknown coder, it only has the opaque Proto > information available which does not allow introspection of non-standard > coders. With the current state, the Runner may think the coder adds a > length prefix but the Python SDK worker could choose to add none. This > produces an inconsistent key encoding. See above. I think what's being conflated here is "the Coder has been wrapped in a LengthPrefixCoder" vs. "the coder does length prefixing." These are two orthogonal concepts. The runner in general only knows the former. > It looks like the key encoding for state requests on the Python SDK > Harness side is broken. For transferring elements of a PCollection, the > coders are obviously working correctly, but for encoding solely the key > of an element, there is a consistency issue. > > > -Max > > On 06.11.19 05:35, Kenneth Knowles wrote: > > Specifically, "We have no way of telling from the Runner side, if a > > length prefix has been used or not." seems false. The runner has all the > > information since length prefix is a model coder. Didn't we agree that > > all coders should be self-delimiting in runner/SDK interactions, > > requiring length-prefix only when there is an opaque or dynamic-length > > value? I assume you mean that at runtime the worker for a given engine > > does not know? > > > > Kenn > > > > On Tue, Nov 5, 2019 at 3:19 PM Luke Cwik <lc...@google.com > > <mailto:lc...@google.com>> wrote: > > > > +1 to what Robert said. > > > > On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw <rober...@google.com > > <mailto:rober...@google.com>> wrote: > > > > The Coder used for State/Timers in a StatefulDoFn is pulled out > > of the > > input PCollection. If a Runner needs to partition by this coder, it > > should ensure the coder of this PCollection matches with the Coder > > used to create the serialized bytes that are used for partitioning > > (whether or not this is length-prefixed). > > > > Concretely, the graph looks like > > > > > > Runner SDK Harness > > > > WriteToGbk > > | > > ReadFromGbk > > | > > RunnerMapFn.processKeyValue(key, value) > > | > > WriteToDataChannel > > ------------------------> > > ReadFromDataChannel > > | > > (pcIn) > > | > > MyStatefulDoFn.process(key, value) > > > > Now the (key part of the) Coder of pcIn, which comes from the proto > > that the Runner sent to the SDK, must match the (key part of the) > > encoding used in WriteToGbk and ReadFromGbk. If a LenthPrefix is > > added > > in one spot, it must be added in the other. > > > > > > [1] > > > > https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183 > > > > On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels > > <m...@apache.org <mailto:m...@apache.org>> wrote: > > > > > > Hi, > > > > > > I wanted to get your opinion on something that I have been > > struggling > > > with. It is about the coders for state requests in portable > > pipelines. > > > > > > In contrast to "classic" Beam, the Runner is not guaranteed > > to know > > > which coder is used by the SDK. If the SDK happens to use a > > standard > > > coder (also known as model coder), we will also have it > > available at the > > > Runner, i.e. if the Runner is written in one of the SDK > > languages (e.g. > > > Java). However, when we do not have a standard coder, we just > > treat the > > > data from the SDK as a blob and just pass it around as bytes. > > > > > > Problem > > > ======= > > > > > > In the case of state requests which the SDK Harness authors > > to the > > > Runner, we would like for the key associated with the state > > request to > > > match the key of the element which led to initiating the > > state request. > > > > > > Example: > > > > > > Runner SDK Harness > > > ------ ----------- > > > > > > KV["key","value"] --> Process Element > > > | > > > LookupState("key") <-- Request state of "key" > > > | > > > State["key"] --> Receive state > > > > > > > > > For stateful DoFns, the Runner partitions the data based on > > the key. In > > > Flink, this partitioning must not change during the lifetime of a > > > pipeline because the checkpointing otherwise breaks[0]. The > > key is > > > extracted from the element and stored encoded. > > > > > > If we have a standard coder, it is basically the same as in the > > > "classic" Runner which takes the key and serializes it. > > However, when we > > > have an SDK-specific coder, we basically do not know how it > > encodes. So > > > far, we have been using the coder instantiated from the > > Proto, which is > > > basically a LengthPrefixCoder[ByteArrayCoder] or similar[1]. > > We have had > > > problems with this because the key encoding of Java SDK state > > requests > > > did not match the key encoding on the Runner side [2]. In an > > attempt to > > > fix those, it is now partly broken for portable Python pipelines. > > > Partly, because it "only" affects non-standard coders. > > > > > > Non-standard coders yield the aforementioned > > > LengthPrefixCoder[ByteArrayCoder]. Now, following the usual > > encoding > > > scheme, we would simply encode the key using this coder. > > However, for > > > state requests, the Python SDK leaves out the length prefix > > for certain > > > coders, e.g. for primitives like int or byte. It is possible > > that one > > > coder uses a length prefix, while another doesn't. We have no > > way of > > > telling from the Runner side, if a length prefix has been > > used or not. > > > This results in the keys to not match on the Runner side and the > > > partitioning to be broken. > > > > > > > > > How to solve this? > > > ================== > > > > > > (1) Should this simply be fixed on the Python SDK side? One > > fix would be > > > to always append a length prefix to the key in state > > requests, even for > > > primitive coders like VarInt which do not use one. > > > > > > OR > > > > > > (2) Should the Runner detect that a non-standard coder is > > used? If so, > > > it should just pass the bytes from the SDK Harness and never > > make an > > > attempt to construct a coder based on the Proto. > > > > > > > > > Thinking about it now, it seems pretty obvious that (2) is > > the most > > > feasible way to avoid complications across all current and > > future SDKs > > > for key encodings. Still, it is odd that the Proto contains coder > > > information which is not usable. > > > > > > What do you think? > > > > > > > > > Thanks, > > > Max > > > > > > > > > [0] It is possible to restart the pipeline and repartition the > > > checkpointed data. > > > [1] > > > > > > > https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682 > > > [2] https://issues.apache.org/jira/browse/BEAM-8157 > >