Jongbin Park created BEAM-13040:
-----------------------------------
Summary: AsIter side input is not currectly recognized as a
dependency.
Key: BEAM-13040
URL: https://issues.apache.org/jira/browse/BEAM-13040
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Environment: Linux Debian 5.10 x86_64
Python 3.8.11
Reporter: Jongbin Park
The error is happening at current master (head). It is fine on the latest
release (2.33.0).
Example to reproduce:
{code:python}
import unittest
import apache_beam as beam
class LoggingFn(beam.DoFn):
def __init__(self, name):
self._name = name
def process(self, element, *side_inputs):
print(f'Running {self._name} (side inputs: {[list(s) for s in
side_inputs]})')
return [self._name]
class BeamDagTest(unittest.TestCase):
def test_dag(self):
with beam.Pipeline() as p:
root = p | 'CreateRoot' >> beam.Create([None])
example_gen = root | 'CsvExampleGen' >> beam.ParDo(
LoggingFn('CsvExampleGen'),
)
statistics_gen = root | 'StatisticsGen' >> beam.ParDo(
LoggingFn('StatisticsGen'),
beam.pvalue.AsIter(example_gen), # AsIter to specify upstream task
dependency.
)
schema_gen = root | 'SchemaGen' >> beam.ParDo(
LoggingFn('SchemaGen'),
beam.pvalue.AsIter(statistics_gen),
)
example_validator = root | 'ExampleValidator' >> beam.ParDo(
LoggingFn('ExampleValidator'),
beam.pvalue.AsIter(statistics_gen),
beam.pvalue.AsIter(schema_gen),
)
transform = root | 'Transform' >> beam.ParDo(
LoggingFn('Transform'),
beam.pvalue.AsIter(example_gen),
beam.pvalue.AsIter(schema_gen),
)
trainer = root | 'Trainer' >> beam.ParDo(
LoggingFn('Trainer'),
beam.pvalue.AsIter(example_gen),
beam.pvalue.AsIter(schema_gen),
beam.pvalue.AsIter(transform),
)
model_resolver = root | 'latest_blessed_model_resolver' >> beam.ParDo(
LoggingFn('latest_blessed_model_resolver'),
)
evaluator = root | 'Evaluator' >> beam.ParDo(
LoggingFn('Evaluator'),
beam.pvalue.AsIter(example_gen),
beam.pvalue.AsIter(trainer),
beam.pvalue.AsIter(model_resolver),
)
pusher = root | 'Pusher' >> beam.ParDo(
LoggingFn('Pusher'),
beam.pvalue.AsIter(trainer),
beam.pvalue.AsIter(evaluator),
){code}
According to AsIter
[documentation|https://github.com/apache/beam/blob/64ec15fa2208d8f9b5ca5653866e1992fd07f7dc/sdks/python/apache_beam/pvalue.py#L527],
entire PCollection should be made available as a side input before running the
ParDo, which means it can be used to specify a task dependency, however this
somehow does not work in current master (71d7213d98):
Output with apache-beam==2.33.0:
{code:java}
Running CsvExampleGen (side inputs: [])
Running latest_blessed_model_resolver (side inputs: [])
Running StatisticsGen (side inputs: [['CsvExampleGen']])
Running SchemaGen (side inputs: [['StatisticsGen']])
Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']])
Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], ['Transform']])
Running Evaluator (side inputs: [['CsvExampleGen'], ['Trainer'],
['latest_blessed_model_resolver']])
Running Pusher (side inputs: [['Trainer'], ['Evaluator']]){code}
Output with apache-beam installed from 71d7213d98 (origin/master):
{code:java}
Running CsvExampleGen (side inputs: [])
Running latest_blessed_model_resolver (side inputs: [])
Running StatisticsGen (side inputs: [['CsvExampleGen']])
Running Pusher (side inputs: [[], []])
Running Evaluator (side inputs: [['CsvExampleGen'], [],
['latest_blessed_model_resolver']])
Running SchemaGen (side inputs: [['StatisticsGen']])
Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], []])
Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']]){code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)