[ 
https://issues.apache.org/jira/browse/BEAM-8782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121800#comment-17121800
 ] 

Kenneth Knowles commented on BEAM-8782:
---------------------------------------

This issue is assigned but has not received an update in 30 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> Python typehints: with_output_types breaks multi-output dofns
> -------------------------------------------------------------
>
>                 Key: BEAM-8782
>                 URL: https://issues.apache.org/jira/browse/BEAM-8782
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: P2
>              Labels: stale-assigned
>
> {code}
>   def test_typed_multi_pardo(self):
>     p = TestPipeline()
>     res = (p
>            | beam.Create([1, 2, 3])
>            | beam.Map(lambda e: e).with_outputs().with_output_types(int))
>     self.assertIsNotNone(res[None].element_type)
>     res_main = (res[None]
>                 | 'id_none' >> beam.ParDo(lambda e: 
> [e]).with_input_types(int))
>     assert_that(res_main, equal_to([1, 2, 3]), label='none_check')
>     p.run()
> {code}
> Fails with:
> {code}
> typed_pipeline_test.py:212: 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> ../pvalue.py:113: in __or__
>     return self.pipeline.apply(ptransform, self)
> ../pipeline.py:528: in apply
>     transform.type_check_outputs(pvalueish_result)
> ../transforms/ptransform.py:386: in type_check_outputs
>     self.type_check_inputs_or_outputs(pvalueish, 'output')
> ../transforms/ptransform.py:401: in type_check_inputs_or_outputs
>     if pvalue_.element_type is None:
> ../pvalue.py:241: in __getattr__
>     return self[tag]
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ 
> self = <DoOutputsTuple main_tag=None tags=() transform=<ParDo(PTransform) 
> label=[Map(<lambda at typed_pipeline_test.py:212>)]> at 0x7fa9513f3048>
> tag = 'element_type'
>     def __getitem__(self, tag):
>       # Accept int tags so that we can look at Partition tags with the
>       # same ints that we used in the partition function.
>       # TODO(gildea): Consider requiring string-based tags everywhere.
>       # This will require a partition function that does not return ints.
>       if isinstance(tag, int):
>         tag = str(tag)
>       if tag == self._main_tag:
>         tag = None
>       elif self._tags and tag not in self._tags:
>         raise ValueError(
>             "Tag '%s' is neither the main tag '%s' "
>             "nor any of the tags %s" % (
>                 tag, self._main_tag, self._tags))
>       # Check if we accessed this tag before.
>       if tag in self._pcolls:
>         return self._pcolls[tag]
>     
>       if tag is not None:
>         self._transform.output_tags.add(tag)
>         pcoll = PCollection(self._pipeline, tag=tag, 
> element_type=typehints.Any)
>         # Transfer the producer from the DoOutputsTuple to the resulting
>         # PCollection.
> >       pcoll.producer = self.producer.parts[0]
> E       AttributeError: 'NoneType' object has no attribute 'parts'
> ../pvalue.py:266: AttributeError
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to