Nicolas Lassaux created BEAM-4633:
-------------------------------------
Summary: Assert failing: node not reached
Key: BEAM-4633
URL: https://issues.apache.org/jira/browse/BEAM-4633
Project: Beam
Issue Type: Bug
Components: beam-model
Affects Versions: 2.4.0
Environment: macos 10.13.5
Reporter: Nicolas Lassaux
Assignee: Kenneth Knowles
I have a pipeline here failing on 2.4.0 and not 2.3.0. The issue is that at the
preparation of the pipeline an assert is failing, apparently because a
side-input cannot be accessed.
This is the code:
{code:java}
class AverageFn(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
def add_input(self, sum_count, input):
(sum, count) = sum_count
if input is None:
return sum, count
return sum + input, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
def extract_output(self, sum_count):
(sum, count) = sum_count
return int(sum / count) if count else float('NaN')
{code}
{code:java}
with beam.Pipeline(options=options) as p:
query = "SELECT * FROM training_data.1529424357650_not_aggregated WHERE
zip_code='60613'"
datapoints = p | beam.io.Read(beam.io.BigQuerySource(query=query))
# Count the occurrences of each word.
county_datapoints = (datapoints
| 'FilterDatapoints' >> beam.ParDo(FilterDatapoints())
| 'ZipAsKey' >> beam.Map(lambda x: (x['zip_code'], x)))
# creates a collection of tuples: (<county>:str, <avg_year_built>:int)
avg_year_built = (county_datapoints
| 'YearBuiltAsValue' >> beam.Map(lambda x: (x[0], x[1]['year_built']))
| "CombineYearBuilt" >> beam.CombinePerKey(AverageFn()))
models = (county_datapoints
| 'ExtractFeatures' >> beam.ParDo(
ExtractFeatures(),
avg_year_built=pvalue.AsDict(avg_year_built))
| 'GroupByZip' >> beam.GroupByKey()
| 'ComputeModels' >> beam.ParDo(ComputeModel()))
models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
{code}
This is the traceback:
{code:java}
Traceback (most recent call last):
File "run.py", line 237, in <module>
run()
File "run.py", line 232, in run
models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 389, in __exit__
self.run().wait_until_finish()
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 369, in run
self.to_runner_api(), self.runner, self._options).run(False)
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 382, in run
return self.runner.run_pipeline(self)
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 129, in run_pipeline
return runner.run_pipeline(pipeline)
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 337, in run_pipeline
pipeline.replace_all(_get_transform_overrides(pipeline.options))
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 354, in replace_all
self._replace(override)
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 284, in _replace
self.visit(TransformUpdater(self))
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 410, in visit
self._root_transform().visit(visitor, self, visited)
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 764, in visit
part.visit(visitor, pipeline, visited)
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 754, in visit
assert pval in visited
AssertionError
{code}
If I print the pval that's not in visited, making the test fail, this is what I
get: PCollection[CombineYearBuilt/Combine/ParDo(CombineValuesDoFn).None]
If I remove the assert, I get the following error:
{code:java}
Traceback (most recent call last):
File "run.py", line 237, in <module>
run()
File "run.py", line 232, in run
models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
line 389, in __exit__
self.run().wait_until_finish()
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 414, in wait_until_finish
self._executor.await_completion()
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
line 360, in await_completion
self._executor.await_completion()
File
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
line 403, in await_completion
six.reraise(t, v, tb)
File "<string>", line 3, in reraise
Exception: Monitor task detected a pipeline stall.
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)