Jongbin Park created BEAM-13040:
-----------------------------------

             Summary: AsIter side input is not currectly recognized as a 
dependency.
                 Key: BEAM-13040
                 URL: https://issues.apache.org/jira/browse/BEAM-13040
             Project: Beam
          Issue Type: Bug
          Components: sdk-py-core
         Environment: Linux Debian 5.10 x86_64
Python 3.8.11
            Reporter: Jongbin Park


The error is happening at current master (head). It is fine on the latest 
release (2.33.0).

Example to reproduce: 
{code:python}
import unittest

import apache_beam as beam


class LoggingFn(beam.DoFn):

  def __init__(self, name):
    self._name = name

  def process(self, element, *side_inputs):
    print(f'Running {self._name} (side inputs: {[list(s) for s in 
side_inputs]})')
    return [self._name]


class BeamDagTest(unittest.TestCase):

  def test_dag(self):
    with beam.Pipeline() as p:
      root = p | 'CreateRoot' >> beam.Create([None])
      example_gen = root | 'CsvExampleGen' >> beam.ParDo(
          LoggingFn('CsvExampleGen'),
      )
      statistics_gen = root | 'StatisticsGen' >> beam.ParDo(
          LoggingFn('StatisticsGen'),
          beam.pvalue.AsIter(example_gen),  # AsIter to specify upstream task 
dependency.
      )
      schema_gen = root | 'SchemaGen' >> beam.ParDo(
          LoggingFn('SchemaGen'),
          beam.pvalue.AsIter(statistics_gen),
      )
      example_validator = root | 'ExampleValidator' >> beam.ParDo(
          LoggingFn('ExampleValidator'),
          beam.pvalue.AsIter(statistics_gen),
          beam.pvalue.AsIter(schema_gen),
      )
      transform = root | 'Transform' >> beam.ParDo(
          LoggingFn('Transform'),
          beam.pvalue.AsIter(example_gen),
          beam.pvalue.AsIter(schema_gen),
      )
      trainer = root | 'Trainer' >> beam.ParDo(
          LoggingFn('Trainer'),
          beam.pvalue.AsIter(example_gen),
          beam.pvalue.AsIter(schema_gen),
          beam.pvalue.AsIter(transform),
      )
      model_resolver = root | 'latest_blessed_model_resolver' >> beam.ParDo(
          LoggingFn('latest_blessed_model_resolver'),
      )
      evaluator = root | 'Evaluator' >> beam.ParDo(
          LoggingFn('Evaluator'),
          beam.pvalue.AsIter(example_gen),
          beam.pvalue.AsIter(trainer),
          beam.pvalue.AsIter(model_resolver),
      )
      pusher = root | 'Pusher' >> beam.ParDo(
          LoggingFn('Pusher'),
          beam.pvalue.AsIter(trainer),
          beam.pvalue.AsIter(evaluator),
      ){code}
 
 According to AsIter 
[documentation|https://github.com/apache/beam/blob/64ec15fa2208d8f9b5ca5653866e1992fd07f7dc/sdks/python/apache_beam/pvalue.py#L527],
 entire PCollection should be made available as a side input before running the 
ParDo, which means it can be used to specify a task dependency, however this 
somehow does not work in current master (71d7213d98):

 

Output with apache-beam==2.33.0:
{code:java}
Running CsvExampleGen (side inputs: [])
Running latest_blessed_model_resolver (side inputs: [])
Running StatisticsGen (side inputs: [['CsvExampleGen']])
Running SchemaGen (side inputs: [['StatisticsGen']])
Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']])
Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], ['Transform']])
Running Evaluator (side inputs: [['CsvExampleGen'], ['Trainer'], 
['latest_blessed_model_resolver']])
Running Pusher (side inputs: [['Trainer'], ['Evaluator']]){code}
Output with apache-beam installed from 71d7213d98 (origin/master):
{code:java}
Running CsvExampleGen (side inputs: [])
Running latest_blessed_model_resolver (side inputs: [])
Running StatisticsGen (side inputs: [['CsvExampleGen']])
Running Pusher (side inputs: [[], []])
Running Evaluator (side inputs: [['CsvExampleGen'], [], 
['latest_blessed_model_resolver']])
Running SchemaGen (side inputs: [['StatisticsGen']])
Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], []])
Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']]){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to