I think Create() simply ignores any type hints it's given (which should be
fixed).

On Wed, Oct 17, 2018 at 4:17 PM Maximilian Michels <[email protected]> wrote:

> Type hints turn out to be not so predictable:
>
> 1) WORKS
>    p | beam.Impulse() \
>      | beam.ParDo(MyCreate()).with_output_types(typehints.KV[K, V]) \
>      | "statefulParDo" >> beam.ParDo(AddIndex())
>
> 2) DOES NOT (no KvCoder)
>    p | beam.Create(inputs).with_output_types(typehints.KV[K, V]) \
>      | "statefulParDo" >> beam.ParDo(AddIndex())
>
> Do you know a way to make 2) work, i.e. set the KvCoder for the Create?
>
>
> In the first example, the Create runs in a ParDo, in the second example
> On 17.10.18 15:34, Maximilian Michels wrote:
> > Thanks Robert. I was able to get it working by adding this to the
> > transform before my stateful DoFn:
> >
> >    .with_output_types(typehints.KV[K, V])
> >
> > For some reason `.with_input_types(typehints.KV[K, V])` on my stateful
> > DoFn did not work.
> >
> > Until we enforce KV during pipeline construction, we will have to throw
> > an informative exception in the Runner.
> >
> > On 17.10.18 15:03, Robert Bradshaw wrote:
> >> Yes, we should be enforcing keyness (and use of KeyCoder with)
> >> stateful DoFns, similar to what we do for GBKs. See e.g.
> >> https://github.com/apache/beam/pull/6304#issuecomment-421935375
> >>
> >> (This possibly relates to a long-standing issue that the coder
> >> inference should be moved up into construction, or at least before we
> >> pass the graph to the runner.)
> >>
> >> On Wed, Oct 17, 2018 at 2:52 PM Maximilian Michels <[email protected]
> >> <mailto:[email protected]>> wrote:
> >>
> >>     Hi everyone,
> >>
> >>     While integrating portable state with the FlinkRunner, I hit a
> >> problem
> >>     and wanted to get your opinion.
> >>
> >>     Stateful DoFns require their input to be KV records. The reason for
> >>     this
> >>     is that state is isolated by key. The (non-portable) FlinkRunner
> uses
> >>     Flink's `keyBy(key)` construct to partition state by key [1].
> >>
> >>     That works fine for portable Java pipelines where we enforce the
> `KV`
> >>     class for Stateful DoFns. After running tests with the Python SDK, I
> >>     came to the conclusion that tuples, e.g. `(key, value)` which are
> >> used
> >>     for KV functionality, do not go through the KvCoder but are encoded
> >>     using a byte array encoder.
> >>
> >>     How do we infer the key in the Runner from an opaque sequence of
> >> bytes?
> >>     Should we also require the KvCoder for stateful DoFns in the
> >> Python SDK?
> >>
> >>     Thanks,
> >>     Max
> >>
> >>     [1]
> >>
> >>
> https://github.com/apache/beam/blob/22f59deaf2a0aa77d98f2f024ce4b2a399014382/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L471
> >>
> >>
>

Reply via email to