Nicolas Lassaux created BEAM-4633:
-------------------------------------

             Summary: Assert failing: node not reached
                 Key: BEAM-4633
                 URL: https://issues.apache.org/jira/browse/BEAM-4633
             Project: Beam
          Issue Type: Bug
          Components: beam-model
    Affects Versions: 2.4.0
         Environment: macos 10.13.5
            Reporter: Nicolas Lassaux
            Assignee: Kenneth Knowles


I have a pipeline here failing on 2.4.0 and not 2.3.0. The issue is that at the 
preparation of the pipeline an assert is failing, apparently because a 
side-input cannot be accessed.

 

This is the code:
{code:java}
class AverageFn(beam.CombineFn):
    def create_accumulator(self):
        return (0.0, 0)

    def add_input(self, sum_count, input):
        (sum, count) = sum_count
        if input is None:
            return sum, count
        return sum + input, count + 1

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    def extract_output(self, sum_count):
        (sum, count) = sum_count
        return int(sum / count) if count else float('NaN')
{code}
{code:java}
with beam.Pipeline(options=options) as p:
    query = "SELECT * FROM training_data.1529424357650_not_aggregated WHERE 
zip_code='60613'"
    datapoints = p | beam.io.Read(beam.io.BigQuerySource(query=query))

    # Count the occurrences of each word.
    county_datapoints = (datapoints
    | 'FilterDatapoints' >> beam.ParDo(FilterDatapoints())
    | 'ZipAsKey' >> beam.Map(lambda x: (x['zip_code'], x)))

    # creates a collection of tuples: (<county>:str, <avg_year_built>:int)
    avg_year_built = (county_datapoints
    | 'YearBuiltAsValue' >> beam.Map(lambda x: (x[0], x[1]['year_built']))
    | "CombineYearBuilt" >> beam.CombinePerKey(AverageFn()))

    models = (county_datapoints
    | 'ExtractFeatures' >> beam.ParDo(
        ExtractFeatures(),
        avg_year_built=pvalue.AsDict(avg_year_built))
    | 'GroupByZip' >> beam.GroupByKey()
    | 'ComputeModels' >> beam.ParDo(ComputeModel()))

    models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)

{code}
This is the traceback:
{code:java}
Traceback (most recent call last):
File "run.py", line 237, in <module>
run()
File "run.py", line 232, in run
models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 389, in __exit__
self.run().wait_until_finish()
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 369, in run
self.to_runner_api(), self.runner, self._options).run(False)
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 382, in run
return self.runner.run_pipeline(self)
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
 line 129, in run_pipeline
return runner.run_pipeline(pipeline)
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
 line 337, in run_pipeline
pipeline.replace_all(_get_transform_overrides(pipeline.options))
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 354, in replace_all
self._replace(override)
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 284, in _replace
self.visit(TransformUpdater(self))
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 410, in visit
self._root_transform().visit(visitor, self, visited)
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 764, in visit
part.visit(visitor, pipeline, visited)
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 754, in visit
assert pval in visited
AssertionError
{code}
If I print the pval that's not in visited, making the test fail, this is what I 
get: PCollection[CombineYearBuilt/Combine/ParDo(CombineValuesDoFn).None]

If I remove the assert, I get the following error:
{code:java}
Traceback (most recent call last):
File "run.py", line 237, in <module>
run()
File "run.py", line 232, in run
models | beam.ParDo(SaveToBucket(), bucket=output_gcs_bucket)
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/pipeline.py",
 line 389, in __exit__
self.run().wait_until_finish()
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py",
 line 414, in wait_until_finish
self._executor.await_completion()
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
 line 360, in await_completion
self._executor.await_completion()
File 
"/Users/nlassaux-enodo/go/src/github.com/enodoscore/data-science/market-rent/training/env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
 line 403, in await_completion
six.reraise(t, v, tb)
File "<string>", line 3, in reraise
Exception: Monitor task detected a pipeline stall.
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to