On Thu, Nov 7, 2019 at 8:22 AM Robert Bradshaw <rober...@google.com
<mailto:rober...@google.com>> wrote:
On Thu, Nov 7, 2019 at 6:26 AM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
>
> Thanks for the feedback thus far. Some more comments:
>
> > Instead, the runner knows ahead of time that it
> > will need to instantiate this coder, and should update the bundle
> > processor to specify KvCoder<LengthPrefixCoder<CustomCoder>,
> > VarIntCoder> as the coder so both can pull it out in a
consistent way.
>
> By "update the bundle processor", do you mean modifying the
> ProcessBundleDescriptor's BagUserState with the correct key coder?
> Conceptually that is possible, but the current implementation
does not
> allow for this to happen:
>
https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284
> It enforces ByteString which does not tell the SDK Harness anything
> about the desired encoding.
I meant update the BundleProcessDescriptor proto that is sent to the
SDK
https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L140,
essentially option (1).
For clarity, the "key" coder is specified by the stateful ParDo's main
input PCollection. This means that the ProcessBundleDescriptor should
have something that has the length prefix as part of the remote grpc
port specification AND the PCollection that follows it which is the main
input for the stateful ParDo. This means that the wire format that the
runner sends for the "key" represents the exact same wire format it will
receive for state requests.
I see what you mean Max,
https://github.com/apache/beam/blob/076a037e53ca39b61c1dfeb580527bc8d0371dc1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java#L284
could change to represent the actual coder (whether it be
LengthPrefixCoder<BytesCoder> or some other model coder combination).
Currently that logic assumes that the runner can perform the backwards
mapping by decoding the bytestring with the appropriate coder.
> Since the above does not seem feasible, I see the following options:
>
> (1) Modify the pipeline Proto before the translation and wrap a
> LengthPrefixCoder around non-standard key coders for stateful
> transforms. This would change the encoding for the entire
element, to be
> sure that the key coder for state requests contains a
LengthPrefixCoder
> for state requests from the SDK Harness. Not optimal.
Yes. The contract should be that both the runner and SDK use the
coders that are specified in the proto. The runner controls the proto,
and should ensure it only sends protos it will be able to handle the
SDK responding to. I'm not seeing why this is sub-optimal.
> (2) Add a new method WireCoders#instantiateRunnerWireKeyCoder which
> returns the correct key coder, i.e. for standard coders, the concrete
> coder, and for non-standard coders a ByteArrayCoder. We also need to
> ensure the key encoding on the Runner side is OUTER context, to avoid
> adding a length prefix to the encoded bytes. Basically, the
non-standard
> coders result in a NOOP coder which does not touch the key bytes.
I'd really like to avoid implicit agreements about how the coder that
should be used differs from what's specified in the proto in different
contexts.
> (3) Patch the Python SDK to ensure non-standard state key coders are
> always wrapped in a LengthPrefixCoder. That way, we can keep the
> existing logic on the Runner side.
The key concept here is not "standard coder" but "coder that the
runner does not understand." This knowledge is only in the runner.
Also has the downside of (2).
> Option (2) seems like the most practical.
>
> -Max
>
> On 06.11.19 17:26, Robert Bradshaw wrote:
> > On Wed, Nov 6, 2019 at 2:55 AM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>> wrote:
> >>
> >> Let me try to clarify:
> >>
> >>> The Coder used for State/Timers in a StatefulDoFn is pulled
out of the
> >>> input PCollection. If a Runner needs to partition by this
coder, it
> >>> should ensure the coder of this PCollection matches with the
Coder
> >>> used to create the serialized bytes that are used for
partitioning
> >>> (whether or not this is length-prefixed).
> >>
> >> That is essentially what I had assumed when I wrote the code. The
> >> problem is the coder can be "pulled out" in different ways.
> >>
> >> For example, let's say we have the following Proto PCollection
coder
> >> with non-standard coder "CustomCoder" as the key coder:
> >>
> >> KvCoder<CustomCoder, VarIntCoder>
> >>
> >> From the Runner side, this currently looks like the following:
> >>
> >> PCol: KvCoder<LengthPrefixCoder<ByteArrayCoder>, VarIntCoder>
> >> Key: LengthPrefixCoder<ByteArrayCoder>
> >
> > This is I think where the error is. When If the proto references
> > KvCoder<CustomCoder, VarIntCoder> it should not be pulled out as
> > KvCoder<LengthPrefixCoder<ByteArrayCoder>, VarIntCoder>; as that
> > doesn't have the same encoding. Trying to do instantiate such a
coder
> > should be an error. Instead, the runner knows ahead of time that it
> > will need to instantiate this coder, and should update the bundle
> > processor to specify KvCoder<LengthPrefixCoder<CustomCoder>,
> > VarIntCoder> as the coder so both can pull it out in a
consistent way.
> >
> > When the coder is KvCoder<LengthPrefixCoder<CustomCoder>,
VarIntCoder>
> > instantiating it as KvCoder<ByteArrayCoder, VarIntCoder> on the
runner
> > is of course OK as they do have the same encoding.
> >
> >> At the SDK Harness, we have the coder available:
> >>
> >> PCol: KvCoder<CustomCoder, VarIntCoder>
> >> Key: CustomCoder
> >>
> >> Currently, when the SDK Harness serializes a key for a state
request,
> >> the custom coder may happen to add a length prefix, or it may
not. It
> >> depends on the coder used. The correct behavior would be to
use the same
> >> representation as on the Runner side.
> >>
> >>> Specifically, "We have no way of telling from the Runner
side, if a length prefix has been used or not." seems false
> >>
> >> The Runner cannot inspect an unknown coder, it only has the
opaque Proto
> >> information available which does not allow introspection of
non-standard
> >> coders. With the current state, the Runner may think the coder
adds a
> >> length prefix but the Python SDK worker could choose to add
none. This
> >> produces an inconsistent key encoding. See above.
> >
> > I think what's being conflated here is "the Coder has been
wrapped in
> > a LengthPrefixCoder" vs. "the coder does length prefixing."
These are
> > two orthogonal concepts. The runner in general only knows the
former.
> >
> >> It looks like the key encoding for state requests on the
Python SDK
> >> Harness side is broken. For transferring elements of a
PCollection, the
> >> coders are obviously working correctly, but for encoding
solely the key
> >> of an element, there is a consistency issue.
> >>
> >>
> >> -Max
> >>
> >> On 06.11.19 05:35, Kenneth Knowles wrote:
> >>> Specifically, "We have no way of telling from the Runner
side, if a
> >>> length prefix has been used or not." seems false. The runner
has all the
> >>> information since length prefix is a model coder. Didn't we
agree that
> >>> all coders should be self-delimiting in runner/SDK interactions,
> >>> requiring length-prefix only when there is an opaque or
dynamic-length
> >>> value? I assume you mean that at runtime the worker for a
given engine
> >>> does not know?
> >>>
> >>> Kenn
> >>>
> >>> On Tue, Nov 5, 2019 at 3:19 PM Luke Cwik <lc...@google.com
<mailto:lc...@google.com>
> >>> <mailto:lc...@google.com <mailto:lc...@google.com>>> wrote:
> >>>
> >>> +1 to what Robert said.
> >>>
> >>> On Tue, Nov 5, 2019 at 2:36 PM Robert Bradshaw
<rober...@google.com <mailto:rober...@google.com>
> >>> <mailto:rober...@google.com
<mailto:rober...@google.com>>> wrote:
> >>>
> >>> The Coder used for State/Timers in a StatefulDoFn is
pulled out
> >>> of the
> >>> input PCollection. If a Runner needs to partition by
this coder, it
> >>> should ensure the coder of this PCollection matches
with the Coder
> >>> used to create the serialized bytes that are used
for partitioning
> >>> (whether or not this is length-prefixed).
> >>>
> >>> Concretely, the graph looks like
> >>>
> >>>
> >>> Runner SDK Harness
> >>>
> >>> WriteToGbk
> >>> |
> >>> ReadFromGbk
> >>> |
> >>> RunnerMapFn.processKeyValue(key, value)
> >>> |
> >>> WriteToDataChannel
> >>> ------------------------>
> >>> ReadFromDataChannel
> >>> |
> >>> (pcIn)
> >>> |
> >>> MyStatefulDoFn.process(key, value)
> >>>
> >>> Now the (key part of the) Coder of pcIn, which comes
from the proto
> >>> that the Runner sent to the SDK, must match the (key
part of the)
> >>> encoding used in WriteToGbk and ReadFromGbk. If a
LenthPrefix is
> >>> added
> >>> in one spot, it must be added in the other.
> >>>
> >>>
> >>> [1]
> >>>
https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L1183
> >>>
> >>> On Tue, Nov 5, 2019 at 1:25 PM Maximilian Michels
> >>> <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >>> >
> >>> > Hi,
> >>> >
> >>> > I wanted to get your opinion on something that I
have been
> >>> struggling
> >>> > with. It is about the coders for state requests
in portable
> >>> pipelines.
> >>> >
> >>> > In contrast to "classic" Beam, the Runner is not
guaranteed
> >>> to know
> >>> > which coder is used by the SDK. If the SDK
happens to use a
> >>> standard
> >>> > coder (also known as model coder), we will also
have it
> >>> available at the
> >>> > Runner, i.e. if the Runner is written in one of
the SDK
> >>> languages (e.g.
> >>> > Java). However, when we do not have a standard
coder, we just
> >>> treat the
> >>> > data from the SDK as a blob and just pass it
around as bytes.
> >>> >
> >>> > Problem
> >>> > =======
> >>> >
> >>> > In the case of state requests which the SDK
Harness authors
> >>> to the
> >>> > Runner, we would like for the key associated with
the state
> >>> request to
> >>> > match the key of the element which led to
initiating the
> >>> state request.
> >>> >
> >>> > Example:
> >>> >
> >>> > Runner SDK Harness
> >>> > ------ -----------
> >>> >
> >>> > KV["key","value"] --> Process Element
> >>> > |
> >>> > LookupState("key") <-- Request state of "key"
> >>> > |
> >>> > State["key"] --> Receive state
> >>> >
> >>> >
> >>> > For stateful DoFns, the Runner partitions the
data based on
> >>> the key. In
> >>> > Flink, this partitioning must not change during
the lifetime of a
> >>> > pipeline because the checkpointing otherwise
breaks[0]. The
> >>> key is
> >>> > extracted from the element and stored encoded.
> >>> >
> >>> > If we have a standard coder, it is basically the
same as in the
> >>> > "classic" Runner which takes the key and
serializes it.
> >>> However, when we
> >>> > have an SDK-specific coder, we basically do not
know how it
> >>> encodes. So
> >>> > far, we have been using the coder instantiated
from the
> >>> Proto, which is
> >>> > basically a LengthPrefixCoder[ByteArrayCoder] or
similar[1].
> >>> We have had
> >>> > problems with this because the key encoding of
Java SDK state
> >>> requests
> >>> > did not match the key encoding on the Runner side
[2]. In an
> >>> attempt to
> >>> > fix those, it is now partly broken for portable
Python pipelines.
> >>> > Partly, because it "only" affects non-standard
coders.
> >>> >
> >>> > Non-standard coders yield the aforementioned
> >>> > LengthPrefixCoder[ByteArrayCoder]. Now, following
the usual
> >>> encoding
> >>> > scheme, we would simply encode the key using this
coder.
> >>> However, for
> >>> > state requests, the Python SDK leaves out the
length prefix
> >>> for certain
> >>> > coders, e.g. for primitives like int or byte. It
is possible
> >>> that one
> >>> > coder uses a length prefix, while another
doesn't. We have no
> >>> way of
> >>> > telling from the Runner side, if a length prefix
has been
> >>> used or not.
> >>> > This results in the keys to not match on the
Runner side and the
> >>> > partitioning to be broken.
> >>> >
> >>> >
> >>> > How to solve this?
> >>> > ==================
> >>> >
> >>> > (1) Should this simply be fixed on the Python SDK
side? One
> >>> fix would be
> >>> > to always append a length prefix to the key in state
> >>> requests, even for
> >>> > primitive coders like VarInt which do not use one.
> >>> >
> >>> > OR
> >>> >
> >>> > (2) Should the Runner detect that a non-standard
coder is
> >>> used? If so,
> >>> > it should just pass the bytes from the SDK
Harness and never
> >>> make an
> >>> > attempt to construct a coder based on the Proto.
> >>> >
> >>> >
> >>> > Thinking about it now, it seems pretty obvious
that (2) is
> >>> the most
> >>> > feasible way to avoid complications across all
current and
> >>> future SDKs
> >>> > for key encodings. Still, it is odd that the
Proto contains coder
> >>> > information which is not usable.
> >>> >
> >>> > What do you think?
> >>> >
> >>> >
> >>> > Thanks,
> >>> > Max
> >>> >
> >>> >
> >>> > [0] It is possible to restart the pipeline and
repartition the
> >>> > checkpointed data.
> >>> > [1]
> >>> >
> >>>
https://github.com/apache/beam/blob/c39752af5391fe698a2b4f1489c187ddd4d604c0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L682
> >>> > [2] https://issues.apache.org/jira/browse/BEAM-8157
> >>>