Yes, we should be enforcing keyness (and use of KeyCoder with) stateful DoFns, similar to what we do for GBKs. See e.g. https://github.com/apache/beam/pull/6304#issuecomment-421935375
(This possibly relates to a long-standing issue that the coder inference should be moved up into construction, or at least before we pass the graph to the runner.) On Wed, Oct 17, 2018 at 2:52 PM Maximilian Michels <[email protected]> wrote: > Hi everyone, > > While integrating portable state with the FlinkRunner, I hit a problem > and wanted to get your opinion. > > Stateful DoFns require their input to be KV records. The reason for this > is that state is isolated by key. The (non-portable) FlinkRunner uses > Flink's `keyBy(key)` construct to partition state by key [1]. > > That works fine for portable Java pipelines where we enforce the `KV` > class for Stateful DoFns. After running tests with the Python SDK, I > came to the conclusion that tuples, e.g. `(key, value)` which are used > for KV functionality, do not go through the KvCoder but are encoded > using a byte array encoder. > > How do we infer the key in the Runner from an opaque sequence of bytes? > Should we also require the KvCoder for stateful DoFns in the Python SDK? > > Thanks, > Max > > [1] > > https://github.com/apache/beam/blob/22f59deaf2a0aa77d98f2f024ce4b2a399014382/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L471 >
