Specifically, "We have no way of telling from the Runner side, if a length
prefix has been used or not." seems false. The runner has all the
information since length prefix is a model coder. Didn't we agree that all
coders should be self-delimiting in runner/SDK interactions, requiring
length-prefix only when there is an opaque or dynamic-length value? I
assume you mean that at runtime the worker for a given engine does not know?

Kenn

On Tue, Nov 5, 2019 at 3:19 PM Luke Cwik <lc...@google.com> wrote:

> +1 to what Robert said.
>
> On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> The Coder used for State/Timers in a StatefulDoFn is pulled out of the
>> input PCollection. If a Runner needs to partition by this coder, it
>> should ensure the coder of this PCollection matches with the Coder
>> used to create the serialized bytes that are used for partitioning
>> (whether or not this is length-prefixed).
>>
>> Concretely, the graph looks like
>>
>>
>> Runner                          SDK Harness
>>
>> WriteToGbk
>>     |
>> ReadFromGbk
>>     |
>> RunnerMapFn.processKeyValue(key, value)
>>     |
>>     WriteToDataChannel
>>             ------------------------>
>>                  ReadFromDataChannel
>>                                |
>>                            (pcIn)
>>                                |
>>                   MyStatefulDoFn.process(key, value)
>>
>> Now the (key part of the) Coder of pcIn, which comes from the proto
>> that the Runner sent to the SDK, must match the (key part of the)
>> encoding used in WriteToGbk and ReadFromGbk. If a LenthPrefix is added
>> in one spot, it must be added in the other.
>>
>>
>> [1]
>> https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183
>>
>> On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels <m...@apache.org> wrote:
>> >
>> > Hi,
>> >
>> > I wanted to get your opinion on something that I have been struggling
>> > with. It is about the coders for state requests in portable pipelines.
>> >
>> > In contrast to "classic" Beam, the Runner is not guaranteed to know
>> > which coder is used by the SDK. If the SDK happens to use a standard
>> > coder (also known as model coder), we will also have it available at the
>> > Runner, i.e. if the Runner is written in one of the SDK languages (e.g.
>> > Java). However, when we do not have a standard coder, we just treat the
>> > data from the SDK as a blob and just pass it around as bytes.
>> >
>> > Problem
>> > =======
>> >
>> > In the case of state requests which the SDK Harness authors to the
>> > Runner, we would like for the key associated with the state request to
>> > match the key of the element which led to initiating the state request.
>> >
>> > Example:
>> >
>> > Runner                 SDK Harness
>> > ------                 -----------
>> >
>> > KV["key","value"]  --> Process Element
>> >                                |
>> > LookupState("key") <-- Request state of "key"
>> >          |
>> >     State["key"]    --> Receive state
>> >
>> >
>> > For stateful DoFns, the Runner partitions the data based on the key. In
>> > Flink, this partitioning must not change during the lifetime of a
>> > pipeline because the checkpointing otherwise breaks[0]. The key is
>> > extracted from the element and stored encoded.
>> >
>> > If we have a standard coder, it is basically the same as in the
>> > "classic" Runner which takes the key and serializes it. However, when we
>> > have an SDK-specific coder, we basically do not know how it encodes. So
>> > far, we have been using the coder instantiated from the Proto, which is
>> > basically a LengthPrefixCoder[ByteArrayCoder] or similar[1]. We have had
>> > problems with this because the key encoding of Java SDK state requests
>> > did not match the key encoding on the Runner side [2]. In an attempt to
>> > fix those, it is now partly broken for portable Python pipelines.
>> > Partly, because it "only" affects non-standard coders.
>> >
>> > Non-standard coders yield the aforementioned
>> > LengthPrefixCoder[ByteArrayCoder]. Now, following the usual encoding
>> > scheme, we would simply encode the key using this coder. However, for
>> > state requests, the Python SDK leaves out the length prefix for certain
>> > coders, e.g. for primitives like int or byte. It is possible that one
>> > coder uses a length prefix, while another doesn't. We have no way of
>> > telling from the Runner side, if a length prefix has been used or not.
>> > This results in the keys to not match on the Runner side and the
>> > partitioning to be broken.
>> >
>> >
>> > How to solve this?
>> > ==================
>> >
>> > (1) Should this simply be fixed on the Python SDK side? One fix would be
>> > to always append a length prefix to the key in state requests, even for
>> > primitive coders like VarInt which do not use one.
>> >
>> > OR
>> >
>> > (2) Should the Runner detect that a non-standard coder is used? If so,
>> > it should just pass the bytes from the SDK Harness and never make an
>> > attempt to construct a coder based on the Proto.
>> >
>> >
>> > Thinking about it now, it seems pretty obvious that (2) is the most
>> > feasible way to avoid complications across all current and future SDKs
>> > for key encodings. Still, it is odd that the Proto contains coder
>> > information which is not usable.
>> >
>> > What do you think?
>> >
>> >
>> > Thanks,
>> > Max
>> >
>> >
>> > [0] It is possible to restart the pipeline and repartition the
>> > checkpointed data.
>> > [1]
>> >
>> https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682
>> > [2] https://issues.apache.org/jira/browse/BEAM-8157
>>
>

Reply via email to