Enrico Canzonieri created BEAM-7678:
---------------------------------------

             Summary: typehints with_output_types annotation doesn't work for 
stateful DoFn 
                 Key: BEAM-7678
                 URL: https://issues.apache.org/jira/browse/BEAM-7678
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
    Affects Versions: 2.13.0
            Reporter: Enrico Canzonieri


The output types typehints seem to be ignored when using a stateful DoFn, but 
the same typehint works perfectly when used without state. This issue prevents 
a custom Coder from being used because Beam will default to one of theĀ 
{{FastCoders}} (I believe Pickle).

Example code:
{code}
@typehints.with_output_types(Message)
class StatefulDoFn(DoFn):

    COUNTER_STATE = BagStateSpec('counter', VarIntCoder())
    def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)):
      (key, messages) = element
      newMessage = Message()
      return [newMessage]
{code}
The example code is just defining a stateful DoFn for python. The used runner 
is the Flink 1.6.4 portable runner.

Finally, overriding {{infer_output_type}} to return a 
{{typehints.List[Message]}} solves the issue.

Looking at the code, it seems to me that in 
[https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]
 we do not take the typehints into account.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to