Yes, we should be enforcing keyness (and use of KeyCoder with) stateful
DoFns, similar to what we do for GBKs. See e.g.
https://github.com/apache/beam/pull/6304#issuecomment-421935375

(This possibly relates to a long-standing issue that the coder inference
should be moved up into construction, or at least before we pass the graph
to the runner.)

On Wed, Oct 17, 2018 at 2:52 PM Maximilian Michels <[email protected]> wrote:

> Hi everyone,
>
> While integrating portable state with the FlinkRunner, I hit a problem
> and wanted to get your opinion.
>
> Stateful DoFns require their input to be KV records. The reason for this
> is that state is isolated by key. The (non-portable) FlinkRunner uses
> Flink's `keyBy(key)` construct to partition state by key [1].
>
> That works fine for portable Java pipelines where we enforce the `KV`
> class for Stateful DoFns. After running tests with the Python SDK, I
> came to the conclusion that tuples, e.g. `(key, value)` which are used
> for KV functionality, do not go through the KvCoder but are encoded
> using a byte array encoder.
>
> How do we infer the key in the Runner from an opaque sequence of bytes?
> Should we also require the KvCoder for stateful DoFns in the Python SDK?
>
> Thanks,
> Max
>
> [1]
>
> https://github.com/apache/beam/blob/22f59deaf2a0aa77d98f2f024ce4b2a399014382/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L471
>

Reply via email to