https://github.com/apache/beam/pull/7456
On Thu, Jan 10, 2019 at 10:59 AM Robert Bradshaw <rober...@google.com> wrote: > > Sorry this got lost. I filed > https://issues.apache.org/jira/browse/BEAM-6404; hopefully it'll be an > easy fix. > > On Wed, Jan 9, 2019 at 8:33 PM Allie Chen <yifangc...@google.com> wrote: > > > > Greetings! > > > > May I ask whether there is any plan to work on this issue? Or if I just use > > `BundleBasedDirectRunner` instead of `DirectRunner`, will there be any > > performance issues/caveats I should worry about? > > > > Thanks! > > Allie > > > > On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri <eh...@google.com> wrote: > >> > >> +Robert Bradshaw I would be happy to debug and fix this, but I'd need more > >> guidance on where to look. > >> > >> On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri <eh...@google.com> wrote: > >>> > >>> Created https://issues.apache.org/jira/browse/BEAM-5927 > >>> > >>> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik <lc...@google.com> wrote: > >>>> > >>>> Udi, do you know if we have a bug tracking this issue? > >>>> > >>>> If not, can you file one referencing this e-mail thread? > >>>> > >>>> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen <yifangc...@google.com> wrote: > >>>>> > >>>>> Thanks Udi. I agree, since it works fine removing either the side input > >>>>> or the last flatten and combine operation. > >>>>> > >>>>> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri <eh...@google.com> wrote: > >>>>>> > >>>>>> This looks like a FnApiRunner bug. > >>>>>> When I override use_fnapi_runner = False in direct_runner.py the > >>>>>> pipeline works. > >>>>>> > >>>>>> It seems like either the side-input to _copy_number or the Flatten > >>>>>> operation is the culprit. > >>>>>> > >>>>>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <yifangc...@google.com> > >>>>>> wrote: > >>>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>> I have a project that started failing with DirectRunner, but works > >>>>>>> well using DataflowRunner (last working version is 2.4). The error > >>>>>>> message I received are: > >>>>>>> line 1088, in run_stage > >>>>>>> pipeline_components.pcollections[actual_pcoll_id].coder_id]] > >>>>>>> KeyError: u'ref_Coder_WindowedValueCoder_1' > >>>>>>> > >>>>>>> I have simplified the pipeline to the following example. Can someone > >>>>>>> please take a look? Many thanks! > >>>>>>> > >>>>>>> Allie > >>>>>>> > >>>>>>> > >>>>>>> import apache_beam as beam > >>>>>>> import argparse > >>>>>>> from apache_beam import transforms > >>>>>>> from apache_beam import pvalue > >>>>>>> from apache_beam.options import pipeline_options > >>>>>>> > >>>>>>> > >>>>>>> def _copy_number(number, side=None): > >>>>>>> yield number > >>>>>>> > >>>>>>> > >>>>>>> def fn_sum(values): > >>>>>>> return sum(values) > >>>>>>> > >>>>>>> > >>>>>>> def run(argv=None): > >>>>>>> parser = argparse.ArgumentParser() > >>>>>>> _, pipeline_args = parser.parse_known_args(argv) > >>>>>>> options = pipeline_options.PipelineOptions(pipeline_args) > >>>>>>> numbers = [1, 2] > >>>>>>> with beam.Pipeline(options=options) as p: > >>>>>>> sum_1 = (p > >>>>>>> | 'ReadNumber1' >> transforms.Create(numbers) > >>>>>>> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) > >>>>>>> > >>>>>>> sum_2 = (p > >>>>>>> | 'ReadNumber2' >> transforms.Create(numbers) > >>>>>>> | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) > >>>>>>> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) > >>>>>>> > >>>>>>> _ = ((sum_1, sum_2) > >>>>>>> | beam.Flatten() > >>>>>>> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) > >>>>>>> | beam.io.WriteToText('gs://BUCKET/sum')) > >>>>>>> > >>>>>>> > >>>>>>>