[
https://issues.apache.org/jira/browse/BEAM-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878137#comment-16878137
]
Enrico Canzonieri commented on BEAM-7678:
-----------------------------------------
>From my understanding when we assign a typehints to a DoFn, that should result
>in {{output.element_type}} being set to the proper type in:
>[https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]{{}}
I experimented with the following change, which seems to work:
{code}
diff --git a/sdks/python/apache_beam/pipeline.py
b/sdks/python/apache_beam/pipeline.py
index a0e8a72759..fa67242c36 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -640,7 +640,7 @@ class Pipeline(object):
if len(transform_node.outputs) == 1:
# The runner often has expectations about the output types as well.
output, = transform_node.outputs.values()
- output.element_type = transform_node.transform.infer_output_type(
+ output.element_type = output.element_type or
transform_node.transform.infer_output_type(
pcoll.element_type)
{code}
I'm not sure whether this will introduce any regression though.
> typehints with_output_types annotation doesn't work for stateful DoFn
> ----------------------------------------------------------------------
>
> Key: BEAM-7678
> URL: https://issues.apache.org/jira/browse/BEAM-7678
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.13.0
> Reporter: Enrico Canzonieri
> Priority: Minor
>
> The output types typehints seem to be ignored when using a stateful DoFn, but
> the same typehint works perfectly when used without state. This issue
> prevents a custom Coder from being used because Beam will default to one of
> the {{FastCoders}} (I believe Pickle).
> Example code:
> {code}
> @typehints.with_output_types(Message)
> class StatefulDoFn(DoFn):
> COUNTER_STATE = BagStateSpec('counter', VarIntCoder())
> def process(self, element, counter=DoFn.StateParam(COUNTER_STATE)):
> (key, messages) = element
> newMessage = Message()
> return [newMessage]
> {code}
> The example code is just defining a stateful DoFn for python. The used runner
> is the Flink 1.6.4 portable runner.
> Finally, overriding {{infer_output_type}} to return a
> {{typehints.List[Message]}} solves the issue.
> Looking at the code, it seems to me that in
> [https://github.com/apache/beam/blob/v2.13.0/sdks/python/apache_beam/pipeline.py#L643]
> we do not take the typehints into account.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)