Greetings!

May I ask whether there is any plan to work on this issue? Or if I just use
`BundleBasedDirectRunner` instead of `DirectRunner`, will there be any
performance issues/caveats I should worry about?

Thanks!
Allie

On Tue, Oct 30, 2018 at 8:13 PM Udi Meiri <[email protected]> wrote:

> +Robert Bradshaw <[email protected]> I would be happy to debug and fix
> this, but I'd need more guidance on where to look.
>
> On Tue, Oct 30, 2018 at 4:07 PM Udi Meiri <[email protected]> wrote:
>
>> Created https://issues.apache.org/jira/browse/BEAM-5927
>>
>> On Tue, Oct 30, 2018 at 1:13 PM Lukasz Cwik <[email protected]> wrote:
>>
>>> Udi, do you know if we have a bug tracking this issue?
>>>
>>> If not, can you file one referencing this e-mail thread?
>>>
>>> On Tue, Oct 30, 2018 at 6:33 AM Allie Chen <[email protected]>
>>> wrote:
>>>
>>>> Thanks Udi. I agree, since it works fine removing either the side input
>>>> or the last flatten and combine operation.
>>>>
>>>> On Mon, Oct 29, 2018 at 9:02 PM Udi Meiri <[email protected]> wrote:
>>>>
>>>>> This looks like a FnApiRunner bug.
>>>>> When I override use_fnapi_runner = False in direct_runner.py the
>>>>> pipeline works.
>>>>>
>>>>> It seems like either the side-input to _copy_number or the Flatten
>>>>> operation is the culprit.
>>>>>
>>>>> On Mon, Oct 29, 2018 at 2:37 PM Allie Chen <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have a project that started failing with DirectRunner, but works
>>>>>> well using DataflowRunner (last working version is 2.4). The error 
>>>>>> message
>>>>>> I received are:
>>>>>> line 1088, in run_stage
>>>>>>   pipeline_components.pcollections[actual_pcoll_id].coder_id]]
>>>>>> KeyError: u'ref_Coder_WindowedValueCoder_1'
>>>>>>
>>>>>> I have simplified the pipeline to the following example. Can someone
>>>>>> please take a look? Many thanks!
>>>>>>
>>>>>> Allie
>>>>>>
>>>>>>
>>>>>> import apache_beam as beam
>>>>>> import argparse
>>>>>> from apache_beam import transforms
>>>>>> from apache_beam import pvalue
>>>>>> from apache_beam.options import pipeline_options
>>>>>>
>>>>>>
>>>>>> def _copy_number(number, side=None):
>>>>>>   yield number
>>>>>>
>>>>>>
>>>>>> def fn_sum(values):
>>>>>>   return sum(values)
>>>>>>
>>>>>>
>>>>>> def run(argv=None):
>>>>>>   parser = argparse.ArgumentParser()
>>>>>>   _, pipeline_args = parser.parse_known_args(argv)
>>>>>>   options = pipeline_options.PipelineOptions(pipeline_args)
>>>>>>   numbers = [1, 2]
>>>>>>   with beam.Pipeline(options=options) as p:
>>>>>>     sum_1 = (p
>>>>>>              | 'ReadNumber1' >> transforms.Create(numbers)
>>>>>>              | 'CalculateSum1' >> beam.CombineGlobally(fn_sum))
>>>>>>
>>>>>>     sum_2 = (p
>>>>>>              | 'ReadNumber2' >> transforms.Create(numbers)
>>>>>>              | beam.ParDo(_copy_number, pvalue.AsSingleton(sum_1))
>>>>>>              | 'CalculateSum2' >> beam.CombineGlobally(fn_sum))
>>>>>>
>>>>>>     _ = ((sum_1, sum_2)
>>>>>>          | beam.Flatten()
>>>>>>          | 'CalculateSum3' >> beam.CombineGlobally(fn_sum)
>>>>>>          | beam.io.WriteToText('gs://BUCKET/sum'))
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to