Thanks Robert. I was able to get it working by adding this to the
transform before my stateful DoFn:
.with_output_types(typehints.KV[K, V])
For some reason `.with_input_types(typehints.KV[K, V])` on my stateful
DoFn did not work.
Until we enforce KV during pipeline construction, we will have to throw
an informative exception in the Runner.
On 17.10.18 15:03, Robert Bradshaw wrote:
Yes, we should be enforcing keyness (and use of KeyCoder with) stateful
DoFns, similar to what we do for GBKs. See e.g.
https://github.com/apache/beam/pull/6304#issuecomment-421935375
(This possibly relates to a long-standing issue that the coder inference
should be moved up into construction, or at least before we pass the
graph to the runner.)
On Wed, Oct 17, 2018 at 2:52 PM Maximilian Michels <[email protected]
<mailto:[email protected]>> wrote:
Hi everyone,
While integrating portable state with the FlinkRunner, I hit a problem
and wanted to get your opinion.
Stateful DoFns require their input to be KV records. The reason for
this
is that state is isolated by key. The (non-portable) FlinkRunner uses
Flink's `keyBy(key)` construct to partition state by key [1].
That works fine for portable Java pipelines where we enforce the `KV`
class for Stateful DoFns. After running tests with the Python SDK, I
came to the conclusion that tuples, e.g. `(key, value)` which are used
for KV functionality, do not go through the KvCoder but are encoded
using a byte array encoder.
How do we infer the key in the Runner from an opaque sequence of bytes?
Should we also require the KvCoder for stateful DoFns in the Python SDK?
Thanks,
Max
[1]
https://github.com/apache/beam/blob/22f59deaf2a0aa77d98f2f024ce4b2a399014382/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L471