Greetings! May I ask whether there is any plan to work on this issue? Or if I just use `BundleBasedDirectRunner` instead of `DirectRunner`, will there be any performance issues/caveats I should worry about?
Thanks! Allie On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri <[email protected]> wrote: > +Robert Bradshaw <[email protected]> I would be happy to debug and fix > this, but I'd need more guidance on where to look. > > On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri <[email protected]> wrote: > >> Created https://issues.apache.org/jira/browse/BEAM-5927 >> >> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik <[email protected]> wrote: >> >>> Udi, do you know if we have a bug tracking this issue? >>> >>> If not, can you file one referencing this e-mail thread? >>> >>> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen <[email protected]> >>> wrote: >>> >>>> Thanks Udi. I agree, since it works fine removing either the side input >>>> or the last flatten and combine operation. >>>> >>>> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri <[email protected]> wrote: >>>> >>>>> This looks like a FnApiRunner bug. >>>>> When I override use_fnapi_runner = False in direct_runner.py the >>>>> pipeline works. >>>>> >>>>> It seems like either the side-input to _copy_number or the Flatten >>>>> operation is the culprit. >>>>> >>>>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I have a project that started failing with DirectRunner, but works >>>>>> well using DataflowRunner (last working version is 2.4). The error >>>>>> message >>>>>> I received are: >>>>>> line 1088, in run_stage >>>>>> pipeline_components.pcollections[actual_pcoll_id].coder_id]] >>>>>> KeyError: u'ref_Coder_WindowedValueCoder_1' >>>>>> >>>>>> I have simplified the pipeline to the following example. Can someone >>>>>> please take a look? Many thanks! >>>>>> >>>>>> Allie >>>>>> >>>>>> >>>>>> import apache_beam as beam >>>>>> import argparse >>>>>> from apache_beam import transforms >>>>>> from apache_beam import pvalue >>>>>> from apache_beam.options import pipeline_options >>>>>> >>>>>> >>>>>> def _copy_number(number, side=None): >>>>>> yield number >>>>>> >>>>>> >>>>>> def fn_sum(values): >>>>>> return sum(values) >>>>>> >>>>>> >>>>>> def run(argv=None): >>>>>> parser = argparse.ArgumentParser() >>>>>> _, pipeline_args = parser.parse_known_args(argv) >>>>>> options = pipeline_options.PipelineOptions(pipeline_args) >>>>>> numbers = [1, 2] >>>>>> with beam.Pipeline(options=options) as p: >>>>>> sum_1 = (p >>>>>> | 'ReadNumber1' >> transforms.Create(numbers) >>>>>> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) >>>>>> >>>>>> sum_2 = (p >>>>>> | 'ReadNumber2' >> transforms.Create(numbers) >>>>>> | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) >>>>>> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) >>>>>> >>>>>> _ = ((sum_1, sum_2) >>>>>> | beam.Flatten() >>>>>> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) >>>>>> | beam.io.WriteToText('gs://BUCKET/sum')) >>>>>> >>>>>> >>>>>> >>>>>>
