[ 
https://issues.apache.org/jira/browse/BEAM-4633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Beam JIRA Bot updated BEAM-4633:
--------------------------------
    Labels: stale-P2  (was: )

> Assert failing: node not reached
> --------------------------------
>
>                 Key: BEAM-4633
>                 URL: https://issues.apache.org/jira/browse/BEAM-4633
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.4.0
>         Environment: macos 10.13.5
>            Reporter: Nicolas Lassaux
>            Priority: P2
>              Labels: stale-P2
>
> I have a pipeline here failing on 2.4.0 and not 2.3.0. The issue is that at 
> the preparation of the pipeline an assert is failing, apparently because a 
> side-input cannot be accessed.
>  
> This is the code:
> {code:java}
> class AverageFn(beam.CombineFn):
>     def create_accumulator(self):
>         return (0.0, 0)
>     def add_input(self, sum_count, input):
>         (sum, count) = sum_count
>         if input is None:
>             return sum, count
>         return sum + input, count + 1
>     def merge_accumulators(self, accumulators):
>         sums, counts = zip(*accumulators)
>         return sum(sums), sum(counts)
>     def extract_output(self, sum_count):
>         (sum, count) = sum_count
>         return int(sum / count) if count else float('NaN')
> {code}
> {code:java}
> with beam.Pipeline(options=options) as p:
>     query = "SELECT * FROM training_data.1529424357650_not_aggregated WHERE 
> zip_code='60613'"
>     datapoints = p | beam.io.Read(beam.io.BigQuerySource(query=query))
>     # Count the occurrences of each word.
>     county_datapoints = (datapoints
>     | 'FilterDatapoints' >> beam.ParDo(FilterDatapoints())
>     | 'ZipAsKey' >> beam.Map(lambda x: (x['zip_code'], x)))
>     # creates a collection of tuples: (<county>:str, <avg_year_built>:int)
>     avg_year_built = (county_datapoints
>     | 'YearBuiltAsValue' >> beam.Map(lambda x: (x[0], x[1]['year_built']))
>     | "CombineYearBuilt" >> beam.CombinePerKey(AverageFn()))
>     models = (county_datapoints
>     | 'ExtractFeatures' >> beam.ParDo(
>         ExtractFeatures(),
>         avg_year_built=pvalue.AsDict(avg_year_built))
>     | 'GroupByZip' >> beam.GroupByKey()
>     | 'ComputeModels' >> beam.ParDo(ComputeModel()))
>     models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
> {code}
> This is the traceback:
> {code:java}
> Traceback (most recent call last):
> File "run.py", line 237, in <module>
> run()
> File "run.py", line 232, in run
> models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 389, in __exit__
> self.run().wait_until_finish()
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 369, in run
> self.to_runner_api(), self.runner, self._options).run(False)
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 382, in run
> return self.runner.run_pipeline(self)
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
>  line 129, in run_pipeline
> return runner.run_pipeline(pipeline)
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
>  line 337, in run_pipeline
> pipeline.replace_all(_get_transform_overrides(pipeline.options))
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 354, in replace_all
> self._replace(override)
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 284, in _replace
> self.visit(TransformUpdater(self))
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 410, in visit
> self._root_transform().visit(visitor, self, visited)
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 764, in visit
> part.visit(visitor, pipeline, visited)
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 754, in visit
> assert pval in visited
> AssertionError
> {code}
> If I print the pval that's not in visited, making the test fail, this is what 
> I get: PCollection[CombineYearBuilt/Combine/ParDo(CombineValuesDoFn).None]
> If I remove the assert, I get the following error:
> {code:java}
> Traceback (most recent call last):
> File "run.py", line 237, in <module>
> run()
> File "run.py", line 232, in run
> models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
>  line 389, in __exit__
> self.run().wait_until_finish()
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
>  line 414, in wait_until_finish
> self._executor.await_completion()
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>  line 360, in await_completion
> self._executor.await_completion()
> File 
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>  line 403, in await_completion
> six.reraise(t, v, tb)
> File "<string>", line 3, in reraise
> Exception: Monitor task detected a pipeline stall.
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to