[
https://issues.apache.org/jira/browse/BEAM-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Robert Bradshaw resolved BEAM-5927.
-----------------------------------
Resolution: Duplicate
Fix Version/s: Not applicable
> FnApiRunner KeyError
> --------------------
>
> Key: BEAM-5927
> URL: https://issues.apache.org/jira/browse/BEAM-5927
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Udi Meiri
> Assignee: Robert Bradshaw
> Priority: Major
> Fix For: Not applicable
>
>
> Code to recreate (modified slightly from
> https://lists.apache.org/thread.html/e910bfe702a3c8c5b0902f5f1c2c51fb7b2574f1b4abc4d9efab4e0f@%3Cdev.beam.apache.org%3E):
> import apache_beam as beam
> import argparse
> from apache_beam import transforms
> from apache_beam import pvalue
> from apache_beam.options import pipeline_options
> import logging
> def _copy_number(number, side=None):
> print '_copy_number:', number, side
> yield number
> def fn_sum(values):
> #print 'values', values
> return sum(values)
> def run(argv=None):
> parser = argparse.ArgumentParser()
> _, pipeline_args = parser.parse_known_args(argv)
> options = pipeline_options.PipelineOptions(pipeline_args)
> #options.view_as(pipeline_options.StandardOptions).streaming = True
> numbers = [1, 2]
> with beam.Pipeline(options=options) as p:
> sum_1 = (p
> | 'ReadNumber1' >> transforms.Create(numbers)
> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
> sum_2 = (p
> | 'ReadNumber2' >> transforms.Create(numbers)
> | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
> _ = ((sum_1, sum_2)
> | beam.Flatten()
> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
> | beam.io.WriteToText('gs://foo/sum'))
> logging.getLogger().setLevel(logging.INFO)
> run()
> Console:
> $ python test.py
> INFO:root:Missing pipeline option (runner). Executing pipeline using the
> default runner: DirectRunner.
> INFO:root:==================== <function annotate_downstream_side_inputs at
> 0x7f2b7088e758> ====================
> INFO:root:==================== <function fix_side_input_pcoll_coders at
> 0x7f2b7088e7d0> ====================
> INFO:root:==================== <function lift_combiners at 0x7f2b7088e5f0>
> ====================
> INFO:root:==================== <function expand_gbk at 0x7f2b7088e668>
> ====================
> INFO:root:==================== <function sink_flattens at 0x7f2b7088e6e0>
> ====================
> INFO:root:==================== <function greedily_fuse at 0x7f2b7088e848>
> ====================
> INFO:root:==================== <function impulse_to_input at 0x7f2b7088e578>
> ====================
> INFO:root:==================== <function inject_timer_pcollections at
> 0x7f2b7088e8c0> ====================
> INFO:root:==================== <function sort_stages at 0x7f2b7088e938>
> ====================
> INFO:root:Running
> ((ref_AppliedPTransform_ReadNumber1/Read_3)+((ref_AppliedPTransform_CalculateSum1/KeyWithVoid_5)+(CalculateSum1/CombinePerKey/Precombine)))+(CalculateSum1/CombinePerKey/Group/Write)
> INFO:root:Running
> ((CalculateSum1/CombinePerKey/Group/Read)+(CalculateSum1/CombinePerKey/Merge))+((CalculateSum1/CombinePerKey/ExtractOutputs)+((ref_AppliedPTransform_CalculateSum1/UnKey_13)+(ref_PCollection_PCollection_7/Write)))
> INFO:root:Running
> ((ref_AppliedPTransform_CalculateSum1/DoOnce/Read_15)+(((ref_AppliedPTransform_CalculateSum1/InjectDefault_16)+(ref_PCollection_PCollection_9/Write))+(Flatten/Transcode/0)))+(Flatten/Write/0)
> INFO:root:Running
> ((ref_AppliedPTransform_ReadNumber2/Read_18)+((ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_19)+((ref_AppliedPTransform_CalculateSum2/KeyWithVoid_21)+(CalculateSum2/CombinePerKey/Precombine))))+(CalculateSum2/CombinePerKey/Group/Write)
> Traceback (most recent call last):
> File "test.py", line 41, in <module>
> run()
> File "test.py", line 38, in run
> | beam.io.WriteToText('gs://foo/sum'))
> File
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py",
> line 425, in __exit__
> self.run().wait_until_finish()
> File
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py",
> line 405, in run
> self._options).run(False)
> File
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py",
> line 418, in run
> return self.runner.run_pipeline(self)
> File
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
> line 139, in run_pipeline
> return runner.run_pipeline(pipeline)
> File
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
> line 238, in run_pipeline
> return self.run_via_runner_api(pipeline.to_runner_api())
> File
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
> line 241, in run_via_runner_api
> return self.run_stages(*self.create_stages(pipeline_proto))
> File
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
> line 1020, in run_stages
> pcoll_buffers, safe_coders)
> File
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
> line 1096, in run_stage
> pipeline_components.pcollections[actual_pcoll_id].coder_id]]
> KeyError: u'coder_4'
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)