https://github.com/apache/beam/pull/7456

On Thu, Jan 10, 2019 at 10:59 AM Robert Bradshaw <rober...@google.com> wrote:
>
> Sorry this got lost. I filed
> https://issues.apache.org/jira/browse/BEAM-6404; hopefully it'll be an
> easy fix.
>
> On Wed, Jan 9, 2019 at 8:33 PM Allie Chen <yifangc...@google.com> wrote:
> >
> > Greetings!
> >
> > May I ask whether there is any plan to work on this issue? Or if I just use 
> > `BundleBasedDirectRunner` instead of `DirectRunner`, will there be any 
> > performance issues/caveats I should worry about?
> >
> > Thanks!
> > Allie
> >
> > On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri <eh...@google.com> wrote:
> >>
> >> +Robert Bradshaw I would be happy to debug and fix this, but I'd need more 
> >> guidance on where to look.
> >>
> >> On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri <eh...@google.com> wrote:
> >>>
> >>> Created https://issues.apache.org/jira/browse/BEAM-5927
> >>>
> >>> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik <lc...@google.com> wrote:
> >>>>
> >>>> Udi, do you know if we have a bug tracking this issue?
> >>>>
> >>>> If not, can you file one referencing this e-mail thread?
> >>>>
> >>>> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen <yifangc...@google.com> wrote:
> >>>>>
> >>>>> Thanks Udi. I agree, since it works fine removing either the side input 
> >>>>> or the last flatten and combine operation.
> >>>>>
> >>>>> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri <eh...@google.com> wrote:
> >>>>>>
> >>>>>> This looks like a FnApiRunner bug.
> >>>>>> When I override use_fnapi_runner = False in direct_runner.py the 
> >>>>>> pipeline works.
> >>>>>>
> >>>>>> It seems like either the side-input to _copy_number or the Flatten 
> >>>>>> operation is the culprit.
> >>>>>>
> >>>>>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <yifangc...@google.com> 
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> I have a project that started failing with DirectRunner, but works 
> >>>>>>> well using DataflowRunner (last working version is 2.4). The error 
> >>>>>>> message I received are:
> >>>>>>> line 1088, in run_stage
> >>>>>>>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
> >>>>>>> KeyError: u'ref_Coder_WindowedValueCoder_1'
> >>>>>>>
> >>>>>>> I have simplified the pipeline to the following example. Can someone 
> >>>>>>> please take a look? Many thanks!
> >>>>>>>
> >>>>>>> Allie
> >>>>>>>
> >>>>>>>
> >>>>>>> import apache_beam as beam
> >>>>>>> import argparse
> >>>>>>> from apache_beam import transforms
> >>>>>>> from apache_beam import pvalue
> >>>>>>> from apache_beam.options import pipeline_options
> >>>>>>>
> >>>>>>>
> >>>>>>> def _copy_number(number, side=None):
> >>>>>>>   yield number
> >>>>>>>
> >>>>>>>
> >>>>>>> def fn_sum(values):
> >>>>>>>   return sum(values)
> >>>>>>>
> >>>>>>>
> >>>>>>> def run(argv=None):
> >>>>>>>   parser = argparse.ArgumentParser()
> >>>>>>>   _, pipeline_args = parser.parse_known_args(argv)
> >>>>>>>   options = pipeline_options.PipelineOptions(pipeline_args)
> >>>>>>>   numbers = [1, 2]
> >>>>>>>   with beam.Pipeline(options=options) as p:
> >>>>>>>     sum_1 = (p
> >>>>>>>              | 'ReadNumber1' >> transforms.Create(numbers)
> >>>>>>>              | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
> >>>>>>>
> >>>>>>>     sum_2 = (p
> >>>>>>>              | 'ReadNumber2' >> transforms.Create(numbers)
> >>>>>>>              | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
> >>>>>>>              | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
> >>>>>>>
> >>>>>>>     _ = ((sum_1, sum_2)
> >>>>>>>          | beam.Flatten()
> >>>>>>>          | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
> >>>>>>>          | beam.io.WriteToText('gs://BUCKET/sum'))
> >>>>>>>
> >>>>>>>
> >>>>>>>

Reply via email to