This looks like a FnApiRunner bug. When I override use_fnapi_runner = False in direct_runner.py the pipeline works.
It seems like either the side-input to _copy_number or the Flatten operation is the culprit. On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <yifangc...@google.com> wrote: > Hi, > > I have a project that started failing with DirectRunner, but works well > using DataflowRunner (last working version is 2.4). The error message I > received are: > line 1088, in run_stage > pipeline_components.pcollections[actual_pcoll_id].coder_id]] > KeyError: u'ref_Coder_WindowedValueCoder_1' > > I have simplified the pipeline to the following example. Can someone > please take a look? Many thanks! > > Allie > > > import apache_beam as beam > import argparse > from apache_beam import transforms > from apache_beam import pvalue > from apache_beam.options import pipeline_options > > > def _copy_number(number, side=None): > yield number > > > def fn_sum(values): > return sum(values) > > > def run(argv=None): > parser = argparse.ArgumentParser() > _, pipeline_args = parser.parse_known_args(argv) > options = pipeline_options.PipelineOptions(pipeline_args) > numbers = [1, 2] > with beam.Pipeline(options=options) as p: > sum_1 = (p > | 'ReadNumber1' >> transforms.Create(numbers) > | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) > > sum_2 = (p > | 'ReadNumber2' >> transforms.Create(numbers) > | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) > | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) > > _ = ((sum_1, sum_2) > | beam.Flatten() > | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) > | beam.io.WriteToText('gs://BUCKET/sum')) > > > >
smime.p7s
Description: S/MIME Cryptographic Signature