+Robert Bradshaw <rober...@google.com> I would be happy to debug and fix
this, but I'd need more guidance on where to look.

On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri <eh...@google.com> wrote:

> Created https://issues.apache.org/jira/browse/BEAM-5927
>
> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> Udi, do you know if we have a bug tracking this issue?
>>
>> If not, can you file one referencing this e-mail thread?
>>
>> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen <yifangc...@google.com> wrote:
>>
>>> Thanks Udi. I agree, since it works fine removing either the side input
>>> or the last flatten and combine operation.
>>>
>>> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri <eh...@google.com> wrote:
>>>
>>>> This looks like a FnApiRunner bug.
>>>> When I override use_fnapi_runner = False in direct_runner.py the
>>>> pipeline works.
>>>>
>>>> It seems like either the side-input to _copy_number or the Flatten
>>>> operation is the culprit.
>>>>
>>>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <yifangc...@google.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have a project that started failing with DirectRunner, but works
>>>>> well using DataflowRunner (last working version is 2.4). The error message
>>>>> I received are:
>>>>> line 1088, in run_stage
>>>>>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
>>>>> KeyError: u'ref_Coder_WindowedValueCoder_1'
>>>>>
>>>>> I have simplified the pipeline to the following example. Can someone
>>>>> please take a look? Many thanks!
>>>>>
>>>>> Allie
>>>>>
>>>>>
>>>>> import apache_beam as beam
>>>>> import argparse
>>>>> from apache_beam import transforms
>>>>> from apache_beam import pvalue
>>>>> from apache_beam.options import pipeline_options
>>>>>
>>>>>
>>>>> def _copy_number(number, side=None):
>>>>>   yield number
>>>>>
>>>>>
>>>>> def fn_sum(values):
>>>>>   return sum(values)
>>>>>
>>>>>
>>>>> def run(argv=None):
>>>>>   parser = argparse.ArgumentParser()
>>>>>   _, pipeline_args = parser.parse_known_args(argv)
>>>>>   options = pipeline_options.PipelineOptions(pipeline_args)
>>>>>   numbers = [1, 2]
>>>>>   with beam.Pipeline(options=options) as p:
>>>>>     sum_1 = (p
>>>>>              | 'ReadNumber1' >> transforms.Create(numbers)
>>>>>              | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>>>>>
>>>>>     sum_2 = (p
>>>>>              | 'ReadNumber2' >> transforms.Create(numbers)
>>>>>              | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>>>>>              | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>>>>>
>>>>>     _ = ((sum_1, sum_2)
>>>>>          | beam.Flatten()
>>>>>          | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>>>>>          | beam.io.WriteToText('gs://BUCKET/sum'))
>>>>>
>>>>>
>>>>>
>>>>>

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

Reply via email to