I think Create() simply ignores any type hints it's given (which should be fixed).
On Wed, Oct 17, 2018 at 4:17 PM Maximilian Michels <[email protected]> wrote: > Type hints turn out to be not so predictable: > > 1) WORKS > p | beam.Impulse() \ > | beam.ParDo(MyCreate()).with_output_types(typehints.KV[K, V]) \ > | "statefulParDo" >> beam.ParDo(AddIndex()) > > 2) DOES NOT (no KvCoder) > p | beam.Create(inputs).with_output_types(typehints.KV[K, V]) \ > | "statefulParDo" >> beam.ParDo(AddIndex()) > > Do you know a way to make 2) work, i.e. set the KvCoder for the Create? > > > In the first example, the Create runs in a ParDo, in the second example > On 17.10.18 15:34, Maximilian Michels wrote: > > Thanks Robert. I was able to get it working by adding this to the > > transform before my stateful DoFn: > > > > .with_output_types(typehints.KV[K, V]) > > > > For some reason `.with_input_types(typehints.KV[K, V])` on my stateful > > DoFn did not work. > > > > Until we enforce KV during pipeline construction, we will have to throw > > an informative exception in the Runner. > > > > On 17.10.18 15:03, Robert Bradshaw wrote: > >> Yes, we should be enforcing keyness (and use of KeyCoder with) > >> stateful DoFns, similar to what we do for GBKs. See e.g. > >> https://github.com/apache/beam/pull/6304#issuecomment-421935375 > >> > >> (This possibly relates to a long-standing issue that the coder > >> inference should be moved up into construction, or at least before we > >> pass the graph to the runner.) > >> > >> On Wed, Oct 17, 2018 at 2:52 PM Maximilian Michels <[email protected] > >> <mailto:[email protected]>> wrote: > >> > >> Hi everyone, > >> > >> While integrating portable state with the FlinkRunner, I hit a > >> problem > >> and wanted to get your opinion. > >> > >> Stateful DoFns require their input to be KV records. The reason for > >> this > >> is that state is isolated by key. The (non-portable) FlinkRunner > uses > >> Flink's `keyBy(key)` construct to partition state by key [1]. > >> > >> That works fine for portable Java pipelines where we enforce the > `KV` > >> class for Stateful DoFns. After running tests with the Python SDK, I > >> came to the conclusion that tuples, e.g. `(key, value)` which are > >> used > >> for KV functionality, do not go through the KvCoder but are encoded > >> using a byte array encoder. > >> > >> How do we infer the key in the Runner from an opaque sequence of > >> bytes? > >> Should we also require the KvCoder for stateful DoFns in the > >> Python SDK? > >> > >> Thanks, > >> Max > >> > >> [1] > >> > >> > https://github.com/apache/beam/blob/22f59deaf2a0aa77d98f2f024ce4b2a399014382/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L471 > >> > >> >
