[ 
https://issues.apache.org/jira/browse/BEAM-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmet Altay reassigned BEAM-7678:
---------------------------------

    Assignee: Enrico Canzonieri

> typehints with_output_types annotation doesn't work for stateful DoFn 
> ----------------------------------------------------------------------
>
>                 Key: BEAM-7678
>                 URL: https://issues.apache.org/jira/browse/BEAM-7678
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.13.0
>            Reporter: Enrico Canzonieri
>            Assignee: Enrico Canzonieri
>            Priority: Minor
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The output types typehints seem to be ignored when using a stateful DoFn, but 
> the same typehint works perfectly when used without state. This issue 
> prevents a custom Coder from being used because Beam will default to one of 
> theĀ {{FastCoders}} (I believe Pickle).
> Example code:
> {code}
> @typehints.with_output_types(Message)
> class StatefulDoFn(DoFn):
>     COUNTER_STATE = BagStateSpec('counter', VarIntCoder())
>     def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)):
>       (key, messages) = element
>       newMessage = Message()
>       return [newMessage]
> {code}
> The example code is just defining a stateful DoFn for python. The used runner 
> is the Flink 1.6.4 portable runner.
> Finally, overriding {{infer_output_type}} to return a 
> {{typehints.List[Message]}} solves the issue.
> Looking at the code, it seems to me that in 
> [https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]
>  we do not take the typehints into account.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to