[
https://issues.apache.org/jira/browse/BEAM-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ahmet Altay reassigned BEAM-7678:
---------------------------------
Assignee: Enrico Canzonieri
> typehints with_output_types annotation doesn't work for stateful DoFn
> ----------------------------------------------------------------------
>
> Key: BEAM-7678
> URL: https://issues.apache.org/jira/browse/BEAM-7678
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.13.0
> Reporter: Enrico Canzonieri
> Assignee: Enrico Canzonieri
> Priority: Minor
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> The output types typehints seem to be ignored when using a stateful DoFn, but
> the same typehint works perfectly when used without state. This issue
> prevents a custom Coder from being used because Beam will default to one of
> theĀ {{FastCoders}} (I believe Pickle).
> Example code:
> {code}
> @typehints.with_output_types(Message)
> class StatefulDoFn(DoFn):
> COUNTER_STATE = BagStateSpec('counter', VarIntCoder())
> def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)):
> (key, messages) = element
> newMessage = Message()
> return [newMessage]
> {code}
> The example code is just defining a stateful DoFn for python. The used runner
> is the Flink 1.6.4 portable runner.
> Finally, overriding {{infer_output_type}} to return a
> {{typehints.List[Message]}} solves the issue.
> Looking at the code, it seems to me that in
> [https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]
> we do not take the typehints into account.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)