This looks like a FnApiRunner bug.
When I override use_fnapi_runner = False in direct_runner.py the pipeline
works.

It seems like either the side-input to _copy_number or the Flatten
operation is the culprit.

On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <yifangc...@google.com> wrote:

> Hi,
>
> I have a project that started failing with DirectRunner, but works well
> using DataflowRunner (last working version is 2.4). The error message I
> received are:
> line 1088, in run_stage
>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
> KeyError: u'ref_Coder_WindowedValueCoder_1'
>
> I have simplified the pipeline to the following example. Can someone
> please take a look? Many thanks!
>
> Allie
>
>
> import apache_beam as beam
> import argparse
> from apache_beam import transforms
> from apache_beam import pvalue
> from apache_beam.options import pipeline_options
>
>
> def _copy_number(number, side=None):
>   yield number
>
>
> def fn_sum(values):
>   return sum(values)
>
>
> def run(argv=None):
>   parser = argparse.ArgumentParser()
>   _, pipeline_args = parser.parse_known_args(argv)
>   options = pipeline_options.PipelineOptions(pipeline_args)
>   numbers = [1, 2]
>   with beam.Pipeline(options=options) as p:
>     sum_1 = (p
>              | 'ReadNumber1' >> transforms.Create(numbers)
>              | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>
>     sum_2 = (p
>              | 'ReadNumber2' >> transforms.Create(numbers)
>              | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>              | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>
>     _ = ((sum_1, sum_2)
>          | beam.Flatten()
>          | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>          | beam.io.WriteToText('gs://BUCKET/sum'))
>
>
>
>

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

Reply via email to