Udi, do you know if we have a bug tracking this issue? If not, can you file one referencing this e-mail thread?
On Tue, Oct 30, 2018 at 6:33 AM Allie Chen <yifangc...@google.com> wrote: > Thanks Udi. I agree, since it works fine removing either the side input or > the last flatten and combine operation. > > On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri <eh...@google.com> wrote: > >> This looks like a FnApiRunner bug. >> When I override use_fnapi_runner = False in direct_runner.py the pipeline >> works. >> >> It seems like either the side-input to _copy_number or the Flatten >> operation is the culprit. >> >> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <yifangc...@google.com> wrote: >> >>> Hi, >>> >>> I have a project that started failing with DirectRunner, but works well >>> using DataflowRunner (last working version is 2.4). The error message I >>> received are: >>> line 1088, in run_stage >>> pipeline_components.pcollections[actual_pcoll_id].coder_id]] >>> KeyError: u'ref_Coder_WindowedValueCoder_1' >>> >>> I have simplified the pipeline to the following example. Can someone >>> please take a look? Many thanks! >>> >>> Allie >>> >>> >>> import apache_beam as beam >>> import argparse >>> from apache_beam import transforms >>> from apache_beam import pvalue >>> from apache_beam.options import pipeline_options >>> >>> >>> def _copy_number(number, side=None): >>> yield number >>> >>> >>> def fn_sum(values): >>> return sum(values) >>> >>> >>> def run(argv=None): >>> parser = argparse.ArgumentParser() >>> _, pipeline_args = parser.parse_known_args(argv) >>> options = pipeline_options.PipelineOptions(pipeline_args) >>> numbers = [1, 2] >>> with beam.Pipeline(options=options) as p: >>> sum_1 = (p >>> | 'ReadNumber1' >> transforms.Create(numbers) >>> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) >>> >>> sum_2 = (p >>> | 'ReadNumber2' >> transforms.Create(numbers) >>> | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) >>> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) >>> >>> _ = ((sum_1, sum_2) >>> | beam.Flatten() >>> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) >>> | beam.io.WriteToText('gs://BUCKET/sum')) >>> >>> >>> >>>