Hi dev team, I'm having a lot of trouble running any pipeline that calls GroupByKey. Maybe I'm doing something wrong, but for some reason I cannot get GroupByKey not to crash the program.
I have edited wordcount.go and minimal_wordcount.go to work similarly to my own program, and it crashes for those as well. Here is the snippet of code I added to minimal_wordcount (full source attached): // Concept #3: Invoke the stats.Count transform on our PCollection of // individual words. The Count transform returns a new PCollection of // key/value pairs, where each key represents a unique word in the text. // The associated value is the occurrence count for that word. singles := beam.ParDo(s, func(word string) (string, int) { return word, 1 }, words) grouped := beam.GroupByKey(s, singles) counted := beam.ParDo(s, func(word string, values func(*int) bool) (string, int) { sum := 0 for { var i int if values(&i) { sum = sum + i } else { break } } return word, sum }, grouped) // Use a ParDo to format our PCollection of word counts into a printable // string, suitable for writing to an output file. When each element // produces exactly one element, the DoFn can simply return it. formatted := beam.ParDo(s, func(w string, c int) string { return fmt.Sprintf("%s: %v", w, c) }, counted) I also attached the full source code and output that happens when I run both wordcount and minimal_wordcount. Am I just doing something wrong here? In any case, it seems inappropriate to panic during runtime without any debugging information (save a stack trace, but only if you call beamx.Run() as opposed to direct.Execute(), which just dies without any info. Thank you so much, 8
[{6: KV<string,int>/GW/KV<bytes,int[varintz]>}] [{10: KV<int,string>/GW/KV<int[varintz],bytes>}] 2018/03/30 16:32:15 Pipeline: 2018/03/30 16:32:15 Nodes: {1: []uint8/GW/bytes} {2: string/GW/bytes} {3: string/GW/bytes} {4: string/GW/bytes} {5: string/GW/bytes} {6: KV<string,int>/GW/KV<bytes,int[varintz]>} {7: CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>} {8: KV<string,int>/GW/KV<bytes,int[varintz]>} {9: string/GW/bytes} {10: KV<int,string>/GW/KV<int[varintz],bytes>} {11: CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>} Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/GW/bytes}] 2: ParDo [In(Main): []uint8 <- {1: []uint8/GW/bytes}] -> [Out: T -> {2: string/GW/bytes}] 3: ParDo [In(Main): string <- {2: string/GW/bytes}] -> [Out: string -> {3: string/GW/bytes}] 4: ParDo [In(Main): string <- {3: string/GW/bytes}] -> [Out: string -> {4: string/GW/bytes}] 5: ParDo [In(Main): string <- {4: string/GW/bytes}] -> [Out: string -> {5: string/GW/bytes}] 6: ParDo [In(Main): string <- {5: string/GW/bytes}] -> [Out: KV<string,int> -> {6: KV<string,int>/GW/KV<bytes,int[varintz]>}] 7: CoGBK [In(Main): KV<string,int> <- {6: KV<string,int>/GW/KV<bytes,int[varintz]>}] -> [Out: CoGBK<string,int> -> {7: CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>}] 8: ParDo [In(Main): CoGBK<string,int> <- {7: CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>}] -> [Out: KV<string,int> -> {8: KV<string,int>/GW/KV<bytes,int[varintz]>}] 9: ParDo [In(Main): KV<string,int> <- {8: KV<string,int>/GW/KV<bytes,int[varintz]>}] -> [Out: string -> {9: string/GW/bytes}] 10: ParDo [In(Main): T <- {9: string/GW/bytes}] -> [Out: KV<int,T> -> {10: KV<int,string>/GW/KV<int[varintz],bytes>}] 11: CoGBK [In(Main): KV<int,string> <- {10: KV<int,string>/GW/KV<int[varintz],bytes>}] -> [Out: CoGBK<int,string> -> {11: CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>}] 12: ParDo [In(Main): CoGBK<int,string> <- {11: CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>}] -> [] 2018/03/30 16:32:16 Reading from gs://apache-beam-samples/shakespeare/1kinghenryiv.txt 2018/03/30 16:32:16 Reading from gs://apache-beam-samples/shakespeare/1kinghenryvi.txt 2018/03/30 16:32:17 Reading from gs://apache-beam-samples/shakespeare/2kinghenryiv.txt 2018/03/30 16:32:17 Reading from gs://apache-beam-samples/shakespeare/2kinghenryvi.txt 2018/03/30 16:32:18 Reading from gs://apache-beam-samples/shakespeare/3kinghenryvi.txt 2018/03/30 16:32:18 Reading from gs://apache-beam-samples/shakespeare/allswellthatendswell.txt 2018/03/30 16:32:19 Reading from gs://apache-beam-samples/shakespeare/antonyandcleopatra.txt 2018/03/30 16:32:19 Reading from gs://apache-beam-samples/shakespeare/asyoulikeit.txt 2018/03/30 16:32:19 Reading from gs://apache-beam-samples/shakespeare/comedyoferrors.txt 2018/03/30 16:32:20 Reading from gs://apache-beam-samples/shakespeare/coriolanus.txt 2018/03/30 16:32:20 Reading from gs://apache-beam-samples/shakespeare/cymbeline.txt 2018/03/30 16:32:21 Reading from gs://apache-beam-samples/shakespeare/hamlet.txt 2018/03/30 16:32:21 Reading from gs://apache-beam-samples/shakespeare/juliuscaesar.txt 2018/03/30 16:32:22 Reading from gs://apache-beam-samples/shakespeare/kinghenryv.txt 2018/03/30 16:32:22 Reading from gs://apache-beam-samples/shakespeare/kinghenryviii.txt 2018/03/30 16:32:22 Reading from gs://apache-beam-samples/shakespeare/kingjohn.txt 2018/03/30 16:32:23 Reading from gs://apache-beam-samples/shakespeare/kinglear.txt 2018/03/30 16:32:23 Reading from gs://apache-beam-samples/shakespeare/kingrichardii.txt 2018/03/30 16:32:24 Reading from gs://apache-beam-samples/shakespeare/kingrichardiii.txt 2018/03/30 16:32:24 Reading from gs://apache-beam-samples/shakespeare/loverscomplaint.txt 2018/03/30 16:32:24 Reading from gs://apache-beam-samples/shakespeare/loveslabourslost.txt 2018/03/30 16:32:25 Reading from gs://apache-beam-samples/shakespeare/macbeth.txt 2018/03/30 16:32:25 Reading from gs://apache-beam-samples/shakespeare/measureforemeasure.txt 2018/03/30 16:32:26 Reading from gs://apache-beam-samples/shakespeare/merchantofvenice.txt 2018/03/30 16:32:26 Reading from gs://apache-beam-samples/shakespeare/merrywivesofwindsor.txt 2018/03/30 16:32:26 Reading from gs://apache-beam-samples/shakespeare/midsummersnightsdream.txt 2018/03/30 16:32:27 Reading from gs://apache-beam-samples/shakespeare/muchadoaboutnothing.txt 2018/03/30 16:32:27 Reading from gs://apache-beam-samples/shakespeare/othello.txt 2018/03/30 16:32:28 Reading from gs://apache-beam-samples/shakespeare/periclesprinceoftyre.txt 2018/03/30 16:32:28 Reading from gs://apache-beam-samples/shakespeare/rapeoflucrece.txt 2018/03/30 16:32:29 Reading from gs://apache-beam-samples/shakespeare/romeoandjuliet.txt 2018/03/30 16:32:29 Reading from gs://apache-beam-samples/shakespeare/sonnets.txt 2018/03/30 16:32:29 Reading from gs://apache-beam-samples/shakespeare/tamingoftheshrew.txt 2018/03/30 16:32:30 Reading from gs://apache-beam-samples/shakespeare/tempest.txt 2018/03/30 16:32:30 Reading from gs://apache-beam-samples/shakespeare/timonofathens.txt 2018/03/30 16:32:31 Reading from gs://apache-beam-samples/shakespeare/titusandronicus.txt 2018/03/30 16:32:31 Reading from gs://apache-beam-samples/shakespeare/troilusandcressida.txt 2018/03/30 16:32:32 Reading from gs://apache-beam-samples/shakespeare/twelfthnight.txt 2018/03/30 16:32:32 Reading from gs://apache-beam-samples/shakespeare/twogentlemenofverona.txt 2018/03/30 16:32:32 Reading from gs://apache-beam-samples/shakespeare/various.txt 2018/03/30 16:32:33 Reading from gs://apache-beam-samples/shakespeare/venusandadonis.txt 2018/03/30 16:32:33 Reading from gs://apache-beam-samples/shakespeare/winterstale-personae 2018/03/30 16:32:33 Reading from gs://apache-beam-samples/shakespeare/winterstale.txt
[{6: KV<string,int>/GW/KV<bytes,int[varintz]>}] [{10: KV<int,string>/GW/KV<int[varintz],bytes>}] 2018/03/30 16:38:15 Pipeline: 2018/03/30 16:38:15 Nodes: {1: []uint8/GW/bytes} {2: string/GW/bytes} {3: string/GW/bytes} {4: string/GW/bytes} {5: string/GW/bytes} {6: KV<string,int>/GW/KV<bytes,int[varintz]>} {7: CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>} {8: KV<string,int>/GW/KV<bytes,int[varintz]>} {9: string/GW/bytes} {10: KV<int,string>/GW/KV<int[varintz],bytes>} {11: CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>} Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/GW/bytes}] 2: ParDo [In(Main): []uint8 <- {1: []uint8/GW/bytes}] -> [Out: T -> {2: string/GW/bytes}] 3: ParDo [In(Main): string <- {2: string/GW/bytes}] -> [Out: string -> {3: string/GW/bytes}] 4: ParDo [In(Main): string <- {3: string/GW/bytes}] -> [Out: string -> {4: string/GW/bytes}] 5: ParDo [In(Main): string <- {4: string/GW/bytes}] -> [Out: string -> {5: string/GW/bytes}] 6: ParDo [In(Main): string <- {5: string/GW/bytes}] -> [Out: KV<string,int> -> {6: KV<string,int>/GW/KV<bytes,int[varintz]>}] 7: CoGBK [In(Main): KV<string,int> <- {6: KV<string,int>/GW/KV<bytes,int[varintz]>}] -> [Out: CoGBK<string,int> -> {7: CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>}] 8: ParDo [In(Main): CoGBK<string,int> <- {7: CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>}] -> [Out: KV<string,int> -> {8: KV<string,int>/GW/KV<bytes,int[varintz]>}] 9: ParDo [In(Main): KV<string,int> <- {8: KV<string,int>/GW/KV<bytes,int[varintz]>}] -> [Out: string -> {9: string/GW/bytes}] 10: ParDo [In(Main): T <- {9: string/GW/bytes}] -> [Out: KV<int,T> -> {10: KV<int,string>/GW/KV<int[varintz],bytes>}] 11: CoGBK [In(Main): KV<int,string> <- {10: KV<int,string>/GW/KV<int[varintz],bytes>}] -> [Out: CoGBK<int,string> -> {11: CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>}] 12: ParDo [In(Main): CoGBK<int,string> <- {11: CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>}] -> [] 2018/03/30 16:36:47 Reading from gs://apache-beam-samples/shakespeare/kinglear.txt 2018/03/30 16:36:48 Failed to execute job: panic: runtime error: index out of range goroutine 1 [running]: runtime/debug.Stack(0xc420121158, 0x18f4340, 0x1fad500) /usr/local/go/src/runtime/debug/stack.go:24 +0xa7 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc420121c80) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39 +0x6e panic(0x18f4340, 0x1fad500) /usr/local/go/src/runtime/panic.go:491 +0x283 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.Invoke(0x1f755a0, 0xc420c87140, 0xc42005e600, 0xc4201214a8, 0x0, 0x0, 0x0, 0x1473223, 0x1f755a0, 0xc420c86fc0) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/fn.go:68 +0xb0f github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).invokeDataFn(0xc4201ce1c0, 0x1f755a0, 0xc420c87140, 0x0, 0x0, 0x0, 0xc42005e600, 0xc4201214a8, 0x1981e60, 0xc4201c6201, ...) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:180 +0x21d github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc4201ce1c0, 0x1f755a0, 0xc420c87140, 0x1878500, 0xc420c870f8, 0x18778c0, 0xc420c87108, 0x0, 0x0, 0x0, ...) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:99 +0x1c4 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc4201ce2a0, 0x1f755a0, 0xc420c86fc0, 0x1878500, 0xc420715120, 0x0, 0x0, 0x0, 0x0, 0x0, ...) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:106 +0x274 github.com/apache/beam/sdks/go/pkg/beam/runners/direct.(*CoGBK).FinishBundle(0xc42005e940, 0x1f755a0, 0xc4201c60f0, 0x1013d36, 0xc420121788) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/runners/direct/gbk.go:84 +0x247 github.com/apache/beam/sdks/go/pkg/beam/runners/direct.(*Inject).FinishBundle(0xc4201978a0, 0x1f755a0, 0xc4201c60f0, 0x115fd40, 0xc420121840) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/runners/direct/gbk.go:125 +0x48 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f755a0, 0xc4201c60f0, 0xc4201839c0, 0x1, 0x1, 0x0, 0x0) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58 +0x75 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc4201ce380, 0x1f755a0, 0xc4201c60f0, 0x1489b27, 0xc42005e240) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120 +0x13c github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f755a0, 0xc4201c60f0, 0xc4201839d0, 0x1, 0x1, 0x0, 0x0) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58 +0x75 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc4201ce460, 0x1f755a0, 0xc4201c60f0, 0x22b7558, 0x1fc15a0) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120 +0x13c github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f755a0, 0xc4201c60f0, 0xc4201839e0, 0x1, 0x1, 0x0, 0x0) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58 +0x75 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc4201ce540, 0x1f755a0, 0xc4201c60f0, 0xc420121b30, 0x148e774) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120 +0x13c github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f755a0, 0xc4201c60f0, 0xc4201839f0, 0x1, 0x1, 0x0, 0x0) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58 +0x75 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc4201ce620, 0x1f755a0, 0xc4201c60f0, 0x1054cd0, 0xc420032800) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120 +0x13c github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f755a0, 0xc4201c60f0, 0xc420183a00, 0x1, 0x1, 0x0, 0x0) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58 +0x75 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc4201ce700, 0x1f755a0, 0xc4201c60f0, 0xc420121c28, 0x102afb9) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120 +0x13c github.com/apache/beam/sdks/go/pkg/beam/runners/direct.(*Impulse).FinishBundle(0xc4201c60c0, 0x1f755a0, 0xc4201c60f0, 0xc420121c40, 0xc420088080) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/runners/direct/impulse.go:52 +0x48 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Root).FinishBundle-fm(0x1f755a0, 0xc4201c60f0, 0xc420121c80, 0x0) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:118 +0x43 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0x1f755a0, 0xc4201c60f0, 0xc420121cd8, 0x0, 0x0) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42 +0x76 github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc42019a460, 0x1f755a0, 0xc4201c60f0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:118 +0x49f github.com/apache/beam/sdks/go/pkg/beam/runners/direct.Execute(0x1f75520, 0xc420086008, 0xc420184020, 0x6, 0xc4201591a0) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/runners/direct/direct.go:49 +0x2da github.com/apache/beam/sdks/go/pkg/beam.Run(0x1f75520, 0xc420086008, 0x19f37cb, 0x6, 0xc420184020, 0xc42017fbc0, 0xc42017f560) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/runner.go:50 +0x8d github.com/apache/beam/sdks/go/pkg/beam/x/beamx.Run(0x1f75520, 0xc420086008, 0xc420184020, 0xf, 0xc42017f830) /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/x/beamx/run.go:42 +0x57 main.main() /Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go:191 +0x215 exit status 1
minimal_wordcount.go
Description: Binary data
wordcount.go
Description: Binary data