[
https://issues.apache.org/jira/browse/BEAM-4633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Beam JIRA Bot updated BEAM-4633:
--------------------------------
Labels: stale-P2 (was: )
> Assert failing: node not reached
> --------------------------------
>
> Key: BEAM-4633
> URL: https://issues.apache.org/jira/browse/BEAM-4633
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.4.0
> Environment: macos 10.13.5
> Reporter: Nicolas Lassaux
> Priority: P2
> Labels: stale-P2
>
> I have a pipeline here failing on 2.4.0 and not 2.3.0. The issue is that at
> the preparation of the pipeline an assert is failing, apparently because a
> side-input cannot be accessed.
>
> This is the code:
> {code:java}
> class AverageFn(beam.CombineFn):
> def create_accumulator(self):
> return (0.0, 0)
> def add_input(self, sum_count, input):
> (sum, count) = sum_count
> if input is None:
> return sum, count
> return sum + input, count + 1
> def merge_accumulators(self, accumulators):
> sums, counts = zip(*accumulators)
> return sum(sums), sum(counts)
> def extract_output(self, sum_count):
> (sum, count) = sum_count
> return int(sum / count) if count else float('NaN')
> {code}
> {code:java}
> with beam.Pipeline(options=options) as p:
> query = "SELECT * FROM training_data.1529424357650_not_aggregated WHERE
> zip_code='60613'"
> datapoints = p | beam.io.Read(beam.io.BigQuerySource(query=query))
> # Count the occurrences of each word.
> county_datapoints = (datapoints
> | 'FilterDatapoints' >> beam.ParDo(FilterDatapoints())
> | 'ZipAsKey' >> beam.Map(lambda x: (x['zip_code'], x)))
> # creates a collection of tuples: (<county>:str, <avg_year_built>:int)
> avg_year_built = (county_datapoints
> | 'YearBuiltAsValue' >> beam.Map(lambda x: (x[0], x[1]['year_built']))
> | "CombineYearBuilt" >> beam.CombinePerKey(AverageFn()))
> models = (county_datapoints
> | 'ExtractFeatures' >> beam.ParDo(
> ExtractFeatures(),
> avg_year_built=pvalue.AsDict(avg_year_built))
> | 'GroupByZip' >> beam.GroupByKey()
> | 'ComputeModels' >> beam.ParDo(ComputeModel()))
> models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
> {code}
> This is the traceback:
> {code:java}
> Traceback (most recent call last):
> File "run.py", line 237, in <module>
> run()
> File "run.py", line 232, in run
> models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
> line 389, in __exit__
> self.run().wait_until_finish()
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
> line 369, in run
> self.to_runner_api(), self.runner, self._options).run(False)
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
> line 382, in run
> return self.runner.run_pipeline(self)
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 129, in run_pipeline
> return runner.run_pipeline(pipeline)
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 337, in run_pipeline
> pipeline.replace_all(_get_transform_overrides(pipeline.options))
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
> line 354, in replace_all
> self._replace(override)
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
> line 284, in _replace
> self.visit(TransformUpdater(self))
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
> line 410, in visit
> self._root_transform().visit(visitor, self, visited)
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
> line 764, in visit
> part.visit(visitor, pipeline, visited)
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
> line 754, in visit
> assert pval in visited
> AssertionError
> {code}
> If I print the pval that's not in visited, making the test fail, this is what
> I get: PCollection[CombineYearBuilt/Combine/ParDo(CombineValuesDoFn).None]
> If I remove the assert, I get the following error:
> {code:java}
> Traceback (most recent call last):
> File "run.py", line 237, in <module>
> run()
> File "run.py", line 232, in run
> models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
> line 389, in __exit__
> self.run().wait_until_finish()
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 414, in wait_until_finish
> self._executor.await_completion()
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
> line 360, in await_completion
> self._executor.await_completion()
> File
> "/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
> line 403, in await_completion
> six.reraise(t, v, tb)
> File "<string>", line 3, in reraise
> Exception: Monitor task detected a pipeline stall.
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)