Udi Meiri created BEAM-5927:
-------------------------------
Summary: FnApiRunner KeyError
Key: BEAM-5927
URL: https://issues.apache.org/jira/browse/BEAM-5927
Project: Beam
Issue Type: Bug
Components: sdk-py-core
Reporter: Udi Meiri
Assignee: Ahmet Altay
Code to recreate (modified slightly from
https://lists.apache.org/thread.html/e910bfe702a3c8c5b0902f5f1c2c51fb7b2574f1b4abc4d9efab4e0f@%3Cdev.beam.apache.org%3E):
import apache_beam as beam
import argparse
from apache_beam import transforms
from apache_beam import pvalue
from apache_beam.options import pipeline_options
import logging
def _copy_number(number, side=None):
print '_copy_number:', number, side
yield number
def fn_sum(values):
#print 'values', values
return sum(values)
def run(argv=None):
parser = argparse.ArgumentParser()
_, pipeline_args = parser.parse_known_args(argv)
options = pipeline_options.PipelineOptions(pipeline_args)
#options.view_as(pipeline_options.StandardOptions).streaming = True
numbers = [1, 2]
with beam.Pipeline(options=options) as p:
sum_1 = (p
| 'ReadNumber1' >> transforms.Create(numbers)
| 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
sum_2 = (p
| 'ReadNumber2' >> transforms.Create(numbers)
| beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
| 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
_ = ((sum_1, sum_2)
| beam.Flatten()
| 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
| beam.io.WriteToText('gs://foo/sum'))
logging.getLogger().setLevel(logging.INFO)
run()
Console:
$ python test.py
INFO:root:Missing pipeline option (runner). Executing pipeline using the
default runner: DirectRunner.
INFO:root:==================== <function annotate_downstream_side_inputs at
0x7f2b7088e758> ====================
INFO:root:==================== <function fix_side_input_pcoll_coders at
0x7f2b7088e7d0> ====================
INFO:root:==================== <function lift_combiners at 0x7f2b7088e5f0>
====================
INFO:root:==================== <function expand_gbk at 0x7f2b7088e668>
====================
INFO:root:==================== <function sink_flattens at 0x7f2b7088e6e0>
====================
INFO:root:==================== <function greedily_fuse at 0x7f2b7088e848>
====================
INFO:root:==================== <function impulse_to_input at 0x7f2b7088e578>
====================
INFO:root:==================== <function inject_timer_pcollections at
0x7f2b7088e8c0> ====================
INFO:root:==================== <function sort_stages at 0x7f2b7088e938>
====================
INFO:root:Running
((ref_AppliedPTransform_ReadNumber1/Read_3)+((ref_AppliedPTransform_CalculateSum1/KeyWithVoid_5)+(CalculateSum1/CombinePerKey/Precombine)))+(CalculateSum1/CombinePerKey/Group/Write)
INFO:root:Running
((CalculateSum1/CombinePerKey/Group/Read)+(CalculateSum1/CombinePerKey/Merge))+((CalculateSum1/CombinePerKey/ExtractOutputs)+((ref_AppliedPTransform_CalculateSum1/UnKey_13)+(ref_PCollection_PCollection_7/Write)))
INFO:root:Running
((ref_AppliedPTransform_CalculateSum1/DoOnce/Read_15)+(((ref_AppliedPTransform_CalculateSum1/InjectDefault_16)+(ref_PCollection_PCollection_9/Write))+(Flatten/Transcode/0)))+(Flatten/Write/0)
INFO:root:Running
((ref_AppliedPTransform_ReadNumber2/Read_18)+((ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_19)+((ref_AppliedPTransform_CalculateSum2/KeyWithVoid_21)+(CalculateSum2/CombinePerKey/Precombine))))+(CalculateSum2/CombinePerKey/Group/Write)
Traceback (most recent call last):
File "test.py", line 41, in <module>
run()
File "test.py", line 38, in run
| beam.io.WriteToText('gs://foo/sum'))
File
"/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py",
line 425, in __exit__
self.run().wait_until_finish()
File
"/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py",
line 405, in run
self._options).run(False)
File
"/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py",
line 418, in run
return self.runner.run_pipeline(self)
File
"/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
line 139, in run_pipeline
return runner.run_pipeline(pipeline)
File
"/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 238, in run_pipeline
return self.run_via_runner_api(pipeline.to_runner_api())
File
"/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 241, in run_via_runner_api
return self.run_stages(*self.create_stages(pipeline_proto))
File
"/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 1020, in run_stages
pcoll_buffers, safe_coders)
File
"/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
line 1096, in run_stage
pipeline_components.pcollections[actual_pcoll_id].coder_id]]
KeyError: u'coder_4'
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)