Enrico Canzonieri created BEAM-7678:
---------------------------------------
Summary: typehints with_output_types annotation doesn't work for
stateful DoFn
Key: BEAM-7678
URL: https://issues.apache.org/jira/browse/BEAM-7678
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Affects Versions: 2.13.0
Reporter: Enrico Canzonieri
The output types typehints seem to be ignored when using a stateful DoFn, but
the same typehint works perfectly when used without state. This issue prevents
a custom Coder from being used because Beam will default to one of theĀ
{{FastCoders}} (I believe Pickle).
Example code:
{code}
@typehints.with_output_types(Message)
class StatefulDoFn(DoFn):
COUNTER_STATE = BagStateSpec('counter', VarIntCoder())
def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)):
(key, messages) = element
newMessage = Message()
return [newMessage]
{code}
The example code is just defining a stateful DoFn for python. The used runner
is the Flink 1.6.4 portable runner.
Finally, overriding {{infer_output_type}} to return a
{{typehints.List[Message]}} solves the issue.
Looking at the code, it seems to me that in
[https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]
we do not take the typehints into account.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)