[
https://issues.apache.org/jira/browse/BEAM-4132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chuan Yu Foo updated BEAM-4132:
-------------------------------
Description:
TLDR: if you have a multi-output DoFn, then the non-main PCollections with
incorrectly have their element types set to None. This affects type checking
for pipelines involving these PCollections.
Minimal example:
{code}
import apache_beam as beam
class TripleDoFn(beam.DoFn):
def process(self, elem):
yield_elem
if elem % 2 == 0:
yield beam.pvalue.TaggedOutput('ten_times', elem * 10)
if elem % 3 == 0:
yield beam.pvalue.TaggedOutput('hundred_times', elem * 100)
@beam.typehints.with_input_types(int)
@beam.typehints.with_output_types(int)
class MultiplyBy(beam.DoFn):
def __init__(self, multiplier):
self._multiplier = multiplier
def process(self, elem):
return elem * self._multiplier
def main():
with beam.Pipeline() as p:
x, a, b = (
p
| 'Create' >> beam.Create([1, 2, 3])
| 'TripleDo' >> beam.ParDo(TripleDoFn()).with_outputs(
'ten_times', 'hundred_times', main='main_output'))
_ = a | 'MultiplyBy2' >> beam.ParDo(MultiplyBy(2))
if __name__ == '__main__':
main()
{code}
Running this yields the following error:
{noformat}
apache_beam.typehints.decorators.TypeCheckError: Type hint violation for
'MultiplyBy2': requires <type 'int'> but got None for elem
{noformat}
Replacing {{a}} with {{b}} yields the same error. Replacing {{a}} with {{x}}
instead yields the following error:
{noformat}
apache_beam.typehints.decorators.TypeCheckError: Type hint violation for
'MultiplyBy2': requires <type 'int'> but got Union[TaggedOutput, int] for elem
{noformat}
I would expect Beam to correctly infer that {{a}} and {{b}} have element types
of {{int}} rather than {{None}}, and I would also expect Beam to correctly
figure out that the element types of {{x}} are compatible with {{int}}.
was:
TLDR: if you have a multi-output DoFn, then the non-main PCollections with
incorrectly have their element types set to None. This affects type checking
for pipelines involving these PCollections.
Minimal example:
{code:python}
import apache_beam as beam
class TripleDoFn(beam.DoFn):
def process(self, elem):
yield_elem
if elem % 2 == 0:
yield beam.pvalue.TaggedOutput('ten_times', elem * 10)
if elem % 3 == 0:
yield beam.pvalue.TaggedOutput('hundred_times', elem * 100)
@beam.typehints.with_input_types(int)
@beam.typehints.with_output_types(int)
class MultiplyBy(beam.DoFn):
def __init__(self, multiplier):
self._multiplier = multiplier
def process(self, elem):
return elem * self._multiplier
def main():
with beam.Pipeline() as p:
x, a, b = (
p
| 'Create' >> beam.Create([1, 2, 3])
| 'TripleDo' >> beam.ParDo(TripleDoFn()).with_outputs(
'ten_times', 'hundred_times', main='main_output'))
_ = a | 'MultiplyBy2' >> beam.ParDo(MultiplyBy(2))
if __name__ == '__main__':
main()
{code}
Running this yields the following error:
{noformat}
apache_beam.typehints.decorators.TypeCheckError: Type hint violation for
'MultiplyBy2': requires <type 'int'> but got None for elem
{noformat}
Replacing {{a}} with {{b}} as follows yields the same error. Replacing {{a}}
with {{x}} instead yields the following error:
{noformat}
apache_beam.typehints.decorators.TypeCheckError: Type hint violation for
'MultiplyBy2': requires <type 'int'> but got Union[TaggedOutput, int] for elem
{noformat}
I would expect Beam to correctly infer that {{a}} and {{b}} have element types
of {{int}} rather than {{None}}, and I would also expect Beam to correctly
figure out that the element types of {{x}} are compatible with {{int}}.
> Element type inference doesn't work for multi-output DoFns
> ----------------------------------------------------------
>
> Key: BEAM-4132
> URL: https://issues.apache.org/jira/browse/BEAM-4132
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.4.0
> Reporter: Chuan Yu Foo
> Assignee: Ahmet Altay
> Priority: Major
>
> TLDR: if you have a multi-output DoFn, then the non-main PCollections with
> incorrectly have their element types set to None. This affects type checking
> for pipelines involving these PCollections.
> Minimal example:
> {code}
> import apache_beam as beam
> class TripleDoFn(beam.DoFn):
> def process(self, elem):
> yield_elem
> if elem % 2 == 0:
> yield beam.pvalue.TaggedOutput('ten_times', elem * 10)
> if elem % 3 == 0:
> yield beam.pvalue.TaggedOutput('hundred_times', elem * 100)
>
> @beam.typehints.with_input_types(int)
> @beam.typehints.with_output_types(int)
> class MultiplyBy(beam.DoFn):
> def __init__(self, multiplier):
> self._multiplier = multiplier
> def process(self, elem):
> return elem * self._multiplier
>
> def main():
> with beam.Pipeline() as p:
> x, a, b = (
> p
> | 'Create' >> beam.Create([1, 2, 3])
> | 'TripleDo' >> beam.ParDo(TripleDoFn()).with_outputs(
> 'ten_times', 'hundred_times', main='main_output'))
> _ = a | 'MultiplyBy2' >> beam.ParDo(MultiplyBy(2))
> if __name__ == '__main__':
> main()
> {code}
> Running this yields the following error:
> {noformat}
> apache_beam.typehints.decorators.TypeCheckError: Type hint violation for
> 'MultiplyBy2': requires <type 'int'> but got None for elem
> {noformat}
> Replacing {{a}} with {{b}} yields the same error. Replacing {{a}} with {{x}}
> instead yields the following error:
> {noformat}
> apache_beam.typehints.decorators.TypeCheckError: Type hint violation for
> 'MultiplyBy2': requires <type 'int'> but got Union[TaggedOutput, int] for elem
> {noformat}
> I would expect Beam to correctly infer that {{a}} and {{b}} have element
> types of {{int}} rather than {{None}}, and I would also expect Beam to
> correctly figure out that the element types of {{x}} are compatible with
> {{int}}.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)