[ 
https://issues.apache.org/jira/browse/BEAM-13040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jongbin Park updated BEAM-13040:
--------------------------------
    Description: 
The error is happening at current master (head). It is fine on the latest 
release (2.33.0).

Example to reproduce: 
{code:python}
import unittest

import apache_beam as beam


class LoggingFn(beam.DoFn):

  def __init__(self, name):
    self._name = name

  def process(self, element, *side_inputs):
    print(f'Running {self._name} (side inputs: {[list(s) for s in 
side_inputs]})')
    return [self._name]


class BeamDagTest(unittest.TestCase):

  def test_dag(self):
    with beam.Pipeline() as p:
      root = p | 'CreateRoot' >> beam.Create([None])
      example_gen = root | 'CsvExampleGen' >> beam.ParDo(
          LoggingFn('CsvExampleGen'),
      )
      statistics_gen = root | 'StatisticsGen' >> beam.ParDo(
          LoggingFn('StatisticsGen'),
          beam.pvalue.AsIter(example_gen),  # AsIter to specify upstream task 
dependency.
      )
      schema_gen = root | 'SchemaGen' >> beam.ParDo(
          LoggingFn('SchemaGen'),
          beam.pvalue.AsIter(statistics_gen),
      )
      example_validator = root | 'ExampleValidator' >> beam.ParDo(
          LoggingFn('ExampleValidator'),
          beam.pvalue.AsIter(statistics_gen),
          beam.pvalue.AsIter(schema_gen),
      )
      transform = root | 'Transform' >> beam.ParDo(
          LoggingFn('Transform'),
          beam.pvalue.AsIter(example_gen),
          beam.pvalue.AsIter(schema_gen),
      )
      trainer = root | 'Trainer' >> beam.ParDo(
          LoggingFn('Trainer'),
          beam.pvalue.AsIter(example_gen),
          beam.pvalue.AsIter(schema_gen),
          beam.pvalue.AsIter(transform),
      )
      model_resolver = root | 'latest_blessed_model_resolver' >> beam.ParDo(
          LoggingFn('latest_blessed_model_resolver'),
      )
      evaluator = root | 'Evaluator' >> beam.ParDo(
          LoggingFn('Evaluator'),
          beam.pvalue.AsIter(example_gen),
          beam.pvalue.AsIter(trainer),
          beam.pvalue.AsIter(model_resolver),
      )
      pusher = root | 'Pusher' >> beam.ParDo(
          LoggingFn('Pusher'),
          beam.pvalue.AsIter(trainer),
          beam.pvalue.AsIter(evaluator),
      ){code}
 
 According to AsIter 
[documentation|https://github.com/apache/beam/blob/64ec15fa2208d8f9b5ca5653866e1992fd07f7dc/sdks/python/apache_beam/pvalue.py#L527],
 entire PCollection should be made available as a side input, which means side 
input PTransform should run before the current PTransform. We used to exploit 
this feature to run DAG of tasks by injecting task dependency with side inputs, 
however this mechanism does not work properly in current master (71d7213d98):

Output with apache-beam==2.33.0:
{code:java}
Running CsvExampleGen (side inputs: [])
Running latest_blessed_model_resolver (side inputs: [])
Running StatisticsGen (side inputs: [['CsvExampleGen']])
Running SchemaGen (side inputs: [['StatisticsGen']])
Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']])
Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], ['Transform']])
Running Evaluator (side inputs: [['CsvExampleGen'], ['Trainer'], 
['latest_blessed_model_resolver']])
Running Pusher (side inputs: [['Trainer'], ['Evaluator']]){code}
Output with apache-beam installed from 71d7213d98 (origin/master):
{code:java}
Running CsvExampleGen (side inputs: [])
Running latest_blessed_model_resolver (side inputs: [])
Running StatisticsGen (side inputs: [['CsvExampleGen']])
Running Pusher (side inputs: [[], []])
Running Evaluator (side inputs: [['CsvExampleGen'], [], 
['latest_blessed_model_resolver']])
Running SchemaGen (side inputs: [['StatisticsGen']])
Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], []])
Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']]){code}

  was:
The error is happening at current master (head). It is fine on the latest 
release (2.33.0).

Example to reproduce: 
{code:python}
import unittest

import apache_beam as beam


class LoggingFn(beam.DoFn):

  def __init__(self, name):
    self._name = name

  def process(self, element, *side_inputs):
    print(f'Running {self._name} (side inputs: {[list(s) for s in 
side_inputs]})')
    return [self._name]


class BeamDagTest(unittest.TestCase):

  def test_dag(self):
    with beam.Pipeline() as p:
      root = p | 'CreateRoot' >> beam.Create([None])
      example_gen = root | 'CsvExampleGen' >> beam.ParDo(
          LoggingFn('CsvExampleGen'),
      )
      statistics_gen = root | 'StatisticsGen' >> beam.ParDo(
          LoggingFn('StatisticsGen'),
          beam.pvalue.AsIter(example_gen),  # AsIter to specify upstream task 
dependency.
      )
      schema_gen = root | 'SchemaGen' >> beam.ParDo(
          LoggingFn('SchemaGen'),
          beam.pvalue.AsIter(statistics_gen),
      )
      example_validator = root | 'ExampleValidator' >> beam.ParDo(
          LoggingFn('ExampleValidator'),
          beam.pvalue.AsIter(statistics_gen),
          beam.pvalue.AsIter(schema_gen),
      )
      transform = root | 'Transform' >> beam.ParDo(
          LoggingFn('Transform'),
          beam.pvalue.AsIter(example_gen),
          beam.pvalue.AsIter(schema_gen),
      )
      trainer = root | 'Trainer' >> beam.ParDo(
          LoggingFn('Trainer'),
          beam.pvalue.AsIter(example_gen),
          beam.pvalue.AsIter(schema_gen),
          beam.pvalue.AsIter(transform),
      )
      model_resolver = root | 'latest_blessed_model_resolver' >> beam.ParDo(
          LoggingFn('latest_blessed_model_resolver'),
      )
      evaluator = root | 'Evaluator' >> beam.ParDo(
          LoggingFn('Evaluator'),
          beam.pvalue.AsIter(example_gen),
          beam.pvalue.AsIter(trainer),
          beam.pvalue.AsIter(model_resolver),
      )
      pusher = root | 'Pusher' >> beam.ParDo(
          LoggingFn('Pusher'),
          beam.pvalue.AsIter(trainer),
          beam.pvalue.AsIter(evaluator),
      ){code}
 
 According to AsIter 
[documentation|https://github.com/apache/beam/blob/64ec15fa2208d8f9b5ca5653866e1992fd07f7dc/sdks/python/apache_beam/pvalue.py#L527],
 entire PCollection should be made available as a side input before running the 
ParDo, which means it can be used to specify a task dependency, however this 
somehow does not work in current master (71d7213d98):

 

Output with apache-beam==2.33.0:
{code:java}
Running CsvExampleGen (side inputs: [])
Running latest_blessed_model_resolver (side inputs: [])
Running StatisticsGen (side inputs: [['CsvExampleGen']])
Running SchemaGen (side inputs: [['StatisticsGen']])
Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']])
Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], ['Transform']])
Running Evaluator (side inputs: [['CsvExampleGen'], ['Trainer'], 
['latest_blessed_model_resolver']])
Running Pusher (side inputs: [['Trainer'], ['Evaluator']]){code}
Output with apache-beam installed from 71d7213d98 (origin/master):
{code:java}
Running CsvExampleGen (side inputs: [])
Running latest_blessed_model_resolver (side inputs: [])
Running StatisticsGen (side inputs: [['CsvExampleGen']])
Running Pusher (side inputs: [[], []])
Running Evaluator (side inputs: [['CsvExampleGen'], [], 
['latest_blessed_model_resolver']])
Running SchemaGen (side inputs: [['StatisticsGen']])
Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], []])
Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']]){code}


> AsIter side input is not correctly recognized as a dependency.
> --------------------------------------------------------------
>
>                 Key: BEAM-13040
>                 URL: https://issues.apache.org/jira/browse/BEAM-13040
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>         Environment: Linux Debian 5.10 x86_64
> Python 3.8.11
>            Reporter: Jongbin Park
>            Priority: P2
>
> The error is happening at current master (head). It is fine on the latest 
> release (2.33.0).
> Example to reproduce: 
> {code:python}
> import unittest
> import apache_beam as beam
> class LoggingFn(beam.DoFn):
>   def __init__(self, name):
>     self._name = name
>   def process(self, element, *side_inputs):
>     print(f'Running {self._name} (side inputs: {[list(s) for s in 
> side_inputs]})')
>     return [self._name]
> class BeamDagTest(unittest.TestCase):
>   def test_dag(self):
>     with beam.Pipeline() as p:
>       root = p | 'CreateRoot' >> beam.Create([None])
>       example_gen = root | 'CsvExampleGen' >> beam.ParDo(
>           LoggingFn('CsvExampleGen'),
>       )
>       statistics_gen = root | 'StatisticsGen' >> beam.ParDo(
>           LoggingFn('StatisticsGen'),
>           beam.pvalue.AsIter(example_gen),  # AsIter to specify upstream task 
> dependency.
>       )
>       schema_gen = root | 'SchemaGen' >> beam.ParDo(
>           LoggingFn('SchemaGen'),
>           beam.pvalue.AsIter(statistics_gen),
>       )
>       example_validator = root | 'ExampleValidator' >> beam.ParDo(
>           LoggingFn('ExampleValidator'),
>           beam.pvalue.AsIter(statistics_gen),
>           beam.pvalue.AsIter(schema_gen),
>       )
>       transform = root | 'Transform' >> beam.ParDo(
>           LoggingFn('Transform'),
>           beam.pvalue.AsIter(example_gen),
>           beam.pvalue.AsIter(schema_gen),
>       )
>       trainer = root | 'Trainer' >> beam.ParDo(
>           LoggingFn('Trainer'),
>           beam.pvalue.AsIter(example_gen),
>           beam.pvalue.AsIter(schema_gen),
>           beam.pvalue.AsIter(transform),
>       )
>       model_resolver = root | 'latest_blessed_model_resolver' >> beam.ParDo(
>           LoggingFn('latest_blessed_model_resolver'),
>       )
>       evaluator = root | 'Evaluator' >> beam.ParDo(
>           LoggingFn('Evaluator'),
>           beam.pvalue.AsIter(example_gen),
>           beam.pvalue.AsIter(trainer),
>           beam.pvalue.AsIter(model_resolver),
>       )
>       pusher = root | 'Pusher' >> beam.ParDo(
>           LoggingFn('Pusher'),
>           beam.pvalue.AsIter(trainer),
>           beam.pvalue.AsIter(evaluator),
>       ){code}
>  
>  According to AsIter 
> [documentation|https://github.com/apache/beam/blob/64ec15fa2208d8f9b5ca5653866e1992fd07f7dc/sdks/python/apache_beam/pvalue.py#L527],
>  entire PCollection should be made available as a side input, which means 
> side input PTransform should run before the current PTransform. We used to 
> exploit this feature to run DAG of tasks by injecting task dependency with 
> side inputs, however this mechanism does not work properly in current master 
> (71d7213d98):
> Output with apache-beam==2.33.0:
> {code:java}
> Running CsvExampleGen (side inputs: [])
> Running latest_blessed_model_resolver (side inputs: [])
> Running StatisticsGen (side inputs: [['CsvExampleGen']])
> Running SchemaGen (side inputs: [['StatisticsGen']])
> Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
> Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']])
> Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], 
> ['Transform']])
> Running Evaluator (side inputs: [['CsvExampleGen'], ['Trainer'], 
> ['latest_blessed_model_resolver']])
> Running Pusher (side inputs: [['Trainer'], ['Evaluator']]){code}
> Output with apache-beam installed from 71d7213d98 (origin/master):
> {code:java}
> Running CsvExampleGen (side inputs: [])
> Running latest_blessed_model_resolver (side inputs: [])
> Running StatisticsGen (side inputs: [['CsvExampleGen']])
> Running Pusher (side inputs: [[], []])
> Running Evaluator (side inputs: [['CsvExampleGen'], [], 
> ['latest_blessed_model_resolver']])
> Running SchemaGen (side inputs: [['StatisticsGen']])
> Running Trainer (side inputs: [['CsvExampleGen'], ['SchemaGen'], []])
> Running ExampleValidator (side inputs: [['StatisticsGen'], ['SchemaGen']])
> Running Transform (side inputs: [['CsvExampleGen'], ['SchemaGen']]){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to