+Robert Bradshaw <rober...@google.com> I would be happy to debug and fix this, but I'd need more guidance on where to look.
On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri <eh...@google.com> wrote: > Created https://issues.apache.org/jira/browse/BEAM-5927 > > On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik <lc...@google.com> wrote: > >> Udi, do you know if we have a bug tracking this issue? >> >> If not, can you file one referencing this e-mail thread? >> >> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen <yifangc...@google.com> wrote: >> >>> Thanks Udi. I agree, since it works fine removing either the side input >>> or the last flatten and combine operation. >>> >>> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri <eh...@google.com> wrote: >>> >>>> This looks like a FnApiRunner bug. >>>> When I override use_fnapi_runner = False in direct_runner.py the >>>> pipeline works. >>>> >>>> It seems like either the side-input to _copy_number or the Flatten >>>> operation is the culprit. >>>> >>>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <yifangc...@google.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have a project that started failing with DirectRunner, but works >>>>> well using DataflowRunner (last working version is 2.4). The error message >>>>> I received are: >>>>> line 1088, in run_stage >>>>> pipeline_components.pcollections[actual_pcoll_id].coder_id]] >>>>> KeyError: u'ref_Coder_WindowedValueCoder_1' >>>>> >>>>> I have simplified the pipeline to the following example. Can someone >>>>> please take a look? Many thanks! >>>>> >>>>> Allie >>>>> >>>>> >>>>> import apache_beam as beam >>>>> import argparse >>>>> from apache_beam import transforms >>>>> from apache_beam import pvalue >>>>> from apache_beam.options import pipeline_options >>>>> >>>>> >>>>> def _copy_number(number, side=None): >>>>> yield number >>>>> >>>>> >>>>> def fn_sum(values): >>>>> return sum(values) >>>>> >>>>> >>>>> def run(argv=None): >>>>> parser = argparse.ArgumentParser() >>>>> _, pipeline_args = parser.parse_known_args(argv) >>>>> options = pipeline_options.PipelineOptions(pipeline_args) >>>>> numbers = [1, 2] >>>>> with beam.Pipeline(options=options) as p: >>>>> sum_1 = (p >>>>> | 'ReadNumber1' >> transforms.Create(numbers) >>>>> | 'CalculateSum1' >> beam.CombineGlobally(fn_sum)) >>>>> >>>>> sum_2 = (p >>>>> | 'ReadNumber2' >> transforms.Create(numbers) >>>>> | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1)) >>>>> | 'CalculateSum2' >> beam.CombineGlobally(fn_sum)) >>>>> >>>>> _ = ((sum_1, sum_2) >>>>> | beam.Flatten() >>>>> | 'CalculateSum3' >> beam.CombineGlobally(fn_sum) >>>>> | beam.io.WriteToText('gs://BUCKET/sum')) >>>>> >>>>> >>>>> >>>>>
smime.p7s
Description: S/MIME Cryptographic Signature