Hi 8,

 This is a bug in the Go SDK regarding direct output after GBK. As a
workaround, if you change this signature

func(word string, values func(*int) bool) (string, int)

to

func(word string, values func(*int) bool, emit func (string, int))

and emits the result instead of returning it, it works. Opened
https://issues.apache.org/jira/browse/BEAM-3978.

Thanks,
 Henning

PS: Btw, the minimal_wordcount doesn't log the direct.Execute error (among
other things) and is there mainly to mimic the progression in Java. It's
not a good model for real pipelines.



On Fri, Mar 30, 2018 at 5:21 PM 8 Gianfortoni <8...@tokentransit.com> wrote:

> Oh, I forgot to mention that I pulled from master with this commit as
> latest:
> https://github.com/apache/beam/commit/95a524e52606de1467b5d8b2cc99263b8a111a8d
>
>
>
> On Fri, Mar 30, 2018, 5:09 PM 8 Gianfortoni <8...@tokentransit.com> wrote:
>
>> Fix cc to correct Holden.
>>
>> On Fri, Mar 30, 2018 at 5:05 PM, 8 Gianfortoni <8...@tokentransit.com>
>> wrote:
>>
>>> Hi dev team,
>>>
>>> I'm having a lot of trouble running any pipeline that calls GroupByKey.
>>> Maybe I'm doing something wrong, but for some reason I cannot get
>>> GroupByKey not to crash the program.
>>>
>>> I have edited wordcount.go and minimal_wordcount.go to work similarly
>>> to my own program, and it crashes for those as well.
>>>
>>> Here is the snippet of code I added to minimal_wordcount (full source
>>> attached):
>>>
>>>         // Concept #3: Invoke the stats.Count transform on our
>>> PCollection of
>>>
>>>         // individual words. The Count transform returns a new
>>> PCollection of
>>>
>>>         // key/value pairs, where each key represents a unique word in
>>> the text.
>>>
>>>         // The associated value is the occurrence count for that word.
>>>
>>>         singles := beam.ParDo(s, func(word string) (string, int) {
>>>
>>>                 return word, 1
>>>
>>>         }, words)
>>>
>>>
>>>         grouped := beam.GroupByKey(s, singles)
>>>
>>>
>>>         counted := beam.ParDo(s, func(word string, values func(*int)
>>> bool) (string, int) {
>>>
>>>                 sum := 0
>>>
>>>                 for {
>>>
>>>                         var i int
>>>
>>>                         if values(&i) {
>>>
>>>                                 sum = sum + i
>>>
>>>                         } else {
>>>
>>>                                 break
>>>
>>>                         }
>>>
>>>                 }
>>>
>>>                 return word, sum
>>>
>>>         }, grouped)
>>>
>>>
>>>         // Use a ParDo to format our PCollection of word counts into a
>>> printable
>>>
>>>         // string, suitable for writing to an output file. When each
>>> element
>>>
>>>         // produces exactly one element, the DoFn can simply return it.
>>>
>>>         formatted := beam.ParDo(s, func(w string, c int) string {
>>>
>>>                 return fmt.Sprintf("%s: %v", w, c)
>>>
>>>         }, counted)
>>>
>>>
>>>
>>> I also attached the full source code and output that happens when I run
>>> both wordcount and minimal_wordcount.
>>>
>>> Am I just doing something wrong here? In any case, it seems
>>> inappropriate to panic during runtime without any debugging information
>>> (save a stack trace, but only if you call beamx.Run() as opposed to
>>> direct.Execute(), which just dies without any info.
>>>
>>> Thank you so much,
>>> 8
>>>
>>
>>

Reply via email to