damccorm opened a new issue, #21228:
URL: https://github.com/apache/beam/issues/21228
The error is happening at current master (head). It is fine on the latest
release (2.33.0).
Example to reproduce:
```
import unittest
import apache_beam as beam
class LoggingFn(beam.DoFn):
def __init__(self,
name):
self._name = name
def process(self, element, *side_inputs):
print(f'Running {self._name}
(side inputs: {[list(s) for s in side_inputs]})')
return [self._name]
class BeamDagTest(unittest.TestCase):
def test_dag(self):
with beam.Pipeline() as p:
root = p | 'CreateRoot' >> beam.Create([None])
example_gen = root | 'CsvExampleGen' >> beam.ParDo(
LoggingFn('CsvExampleGen'),
)
statistics_gen = root | 'StatisticsGen' >> beam.ParDo(
LoggingFn('StatisticsGen'),
beam.pvalue.AsIter(example_gen), # AsIter to specify upstream task
dependency.
)
schema_gen = root | 'SchemaGen' >> beam.ParDo(
LoggingFn('SchemaGen'),
beam.pvalue.AsIter(statistics_gen),
)
example_validator = root | 'ExampleValidator' >> beam.ParDo(
LoggingFn('ExampleValidator'),
beam.pvalue.AsIter(statistics_gen),
beam.pvalue.AsIter(schema_gen),
)
transform = root | 'Transform' >> beam.ParDo(
LoggingFn('Transform'),
beam.pvalue.AsIter(example_gen),
beam.pvalue.AsIter(schema_gen),
)
trainer = root | 'Trainer' >> beam.ParDo(
LoggingFn('Trainer'),
beam.pvalue.AsIter(example_gen),
beam.pvalue.AsIter(schema_gen),
beam.pvalue.AsIter(transform),
)
model_resolver = root | 'latest_blessed_model_resolver'
>> beam.ParDo(
LoggingFn('latest_blessed_model_resolver'),
)
evaluator = root
| 'Evaluator' >> beam.ParDo(
LoggingFn('Evaluator'),
beam.pvalue.AsIter(example_gen),
beam.pvalue.AsIter(trainer),
beam.pvalue.AsIter(model_resolver),
)
pusher = root | 'Pusher' >> beam.ParDo(
LoggingFn('Pusher'),
beam.pvalue.AsIter(trainer),
beam.pvalue.AsIter(evaluator),
)
```
According to AsIter
[documentation](https://github.com/apache/beam/blob/64ec15fa2208d8f9b5ca5653866e1992fd07f7dc/sdks/python/apache_beam/pvalue.py#L527),
entire PCollection should be made available as a side input, which means side
input PTransform should run before the current PTransform. We used to exploit
this feature to run DAG of tasks by injecting task dependency with side inputs,
however this mechanism does not work properly in current master (71d7213d98):
Output with apache-beam\==2.33.0:
```
Running CsvExampleGen (side inputs: [])
Running latest_blessed_model_resolver (side inputs: [])
Running
StatisticsGen (side inputs: [['CsvExampleGen']])
Running SchemaGen (side inputs: [['StatisticsGen']])
Running
ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
Running Transform (side inputs:
[['CsvExampleGen'], ['SchemaGen']])
Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'],
['Transform']])
Running Evaluator (side inputs: [['CsvExampleGen'], ['Trainer'],
['latest_blessed_model_resolver']])
Running
Pusher (side inputs: [['Trainer'], ['Evaluator']])
```
Output with apache-beam installed from 71d7213d98 (origin/master):
```
Running CsvExampleGen (side inputs: [])
Running latest_blessed_model_resolver (side inputs: [])
Running
StatisticsGen (side inputs: [['CsvExampleGen']])
Running Pusher (side inputs: [[], []])
Running Evaluator
(side inputs: [['CsvExampleGen'], [], ['latest_blessed_model_resolver']])
Running SchemaGen (side inputs:
[['StatisticsGen']])
Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], []])
Running
ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
Running Transform (side inputs:
[['CsvExampleGen'], ['SchemaGen']])
```
Imported from Jira
[BEAM-13040](https://issues.apache.org/jira/browse/BEAM-13040). Original Jira
may contain additional context.
Reported by: Park.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]