Hi dev team, I'm having a lot of trouble running any pipeline that calls GroupByKey. Maybe I'm doing something wrong, but for some reason I cannot get GroupByKey not to crash the program.
I have edited wordcount.go and minimal_wordcount.go to work similarly to my
own program, and it crashes for those as well.
Here is the snippet of code I added to minimal_wordcount (full source
attached):
// Concept #3: Invoke the stats.Count transform on our PCollection
of
// individual words. The Count transform returns a new PCollection
of
// key/value pairs, where each key represents a unique word in the
text.
// The associated value is the occurrence count for that word.
singles := beam.ParDo(s, func(word string) (string, int) {
return word, 1
}, words)
grouped := beam.GroupByKey(s, singles)
counted := beam.ParDo(s, func(word string, values func(*int) bool)
(string, int) {
sum := 0
for {
var i int
if values(&i) {
sum = sum + i
} else {
break
}
}
return word, sum
}, grouped)
// Use a ParDo to format our PCollection of word counts into a
printable
// string, suitable for writing to an output file. When each element
// produces exactly one element, the DoFn can simply return it.
formatted := beam.ParDo(s, func(w string, c int) string {
return fmt.Sprintf("%s: %v", w, c)
}, counted)
I also attached the full source code and output that happens when I run
both wordcount and minimal_wordcount.
Am I just doing something wrong here? In any case, it seems inappropriate
to panic during runtime without any debugging information (save a stack
trace, but only if you call beamx.Run() as opposed to direct.Execute(),
which just dies without any info.
Thank you so much,
8
[{6: KV<string,int>/GW/KV<bytes,int[varintz]>}]
[{10: KV<int,string>/GW/KV<int[varintz],bytes>}]
2018/03/30 16:32:15 Pipeline:
2018/03/30 16:32:15 Nodes: {1: []uint8/GW/bytes}
{2: string/GW/bytes}
{3: string/GW/bytes}
{4: string/GW/bytes}
{5: string/GW/bytes}
{6: KV<string,int>/GW/KV<bytes,int[varintz]>}
{7: CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>}
{8: KV<string,int>/GW/KV<bytes,int[varintz]>}
{9: string/GW/bytes}
{10: KV<int,string>/GW/KV<int[varintz],bytes>}
{11: CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>}
Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/GW/bytes}]
2: ParDo [In(Main): []uint8 <- {1: []uint8/GW/bytes}] -> [Out: T -> {2:
string/GW/bytes}]
3: ParDo [In(Main): string <- {2: string/GW/bytes}] -> [Out: string -> {3:
string/GW/bytes}]
4: ParDo [In(Main): string <- {3: string/GW/bytes}] -> [Out: string -> {4:
string/GW/bytes}]
5: ParDo [In(Main): string <- {4: string/GW/bytes}] -> [Out: string -> {5:
string/GW/bytes}]
6: ParDo [In(Main): string <- {5: string/GW/bytes}] -> [Out: KV<string,int> ->
{6: KV<string,int>/GW/KV<bytes,int[varintz]>}]
7: CoGBK [In(Main): KV<string,int> <- {6:
KV<string,int>/GW/KV<bytes,int[varintz]>}] -> [Out: CoGBK<string,int> -> {7:
CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>}]
8: ParDo [In(Main): CoGBK<string,int> <- {7:
CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>}] -> [Out: KV<string,int> -> {8:
KV<string,int>/GW/KV<bytes,int[varintz]>}]
9: ParDo [In(Main): KV<string,int> <- {8:
KV<string,int>/GW/KV<bytes,int[varintz]>}] -> [Out: string -> {9:
string/GW/bytes}]
10: ParDo [In(Main): T <- {9: string/GW/bytes}] -> [Out: KV<int,T> -> {10:
KV<int,string>/GW/KV<int[varintz],bytes>}]
11: CoGBK [In(Main): KV<int,string> <- {10:
KV<int,string>/GW/KV<int[varintz],bytes>}] -> [Out: CoGBK<int,string> -> {11:
CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>}]
12: ParDo [In(Main): CoGBK<int,string> <- {11:
CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>}] -> []
2018/03/30 16:32:16 Reading from
gs://apache-beam-samples/shakespeare/1kinghenryiv.txt
2018/03/30 16:32:16 Reading from
gs://apache-beam-samples/shakespeare/1kinghenryvi.txt
2018/03/30 16:32:17 Reading from
gs://apache-beam-samples/shakespeare/2kinghenryiv.txt
2018/03/30 16:32:17 Reading from
gs://apache-beam-samples/shakespeare/2kinghenryvi.txt
2018/03/30 16:32:18 Reading from
gs://apache-beam-samples/shakespeare/3kinghenryvi.txt
2018/03/30 16:32:18 Reading from
gs://apache-beam-samples/shakespeare/allswellthatendswell.txt
2018/03/30 16:32:19 Reading from
gs://apache-beam-samples/shakespeare/antonyandcleopatra.txt
2018/03/30 16:32:19 Reading from
gs://apache-beam-samples/shakespeare/asyoulikeit.txt
2018/03/30 16:32:19 Reading from
gs://apache-beam-samples/shakespeare/comedyoferrors.txt
2018/03/30 16:32:20 Reading from
gs://apache-beam-samples/shakespeare/coriolanus.txt
2018/03/30 16:32:20 Reading from
gs://apache-beam-samples/shakespeare/cymbeline.txt
2018/03/30 16:32:21 Reading from gs://apache-beam-samples/shakespeare/hamlet.txt
2018/03/30 16:32:21 Reading from
gs://apache-beam-samples/shakespeare/juliuscaesar.txt
2018/03/30 16:32:22 Reading from
gs://apache-beam-samples/shakespeare/kinghenryv.txt
2018/03/30 16:32:22 Reading from
gs://apache-beam-samples/shakespeare/kinghenryviii.txt
2018/03/30 16:32:22 Reading from
gs://apache-beam-samples/shakespeare/kingjohn.txt
2018/03/30 16:32:23 Reading from
gs://apache-beam-samples/shakespeare/kinglear.txt
2018/03/30 16:32:23 Reading from
gs://apache-beam-samples/shakespeare/kingrichardii.txt
2018/03/30 16:32:24 Reading from
gs://apache-beam-samples/shakespeare/kingrichardiii.txt
2018/03/30 16:32:24 Reading from
gs://apache-beam-samples/shakespeare/loverscomplaint.txt
2018/03/30 16:32:24 Reading from
gs://apache-beam-samples/shakespeare/loveslabourslost.txt
2018/03/30 16:32:25 Reading from
gs://apache-beam-samples/shakespeare/macbeth.txt
2018/03/30 16:32:25 Reading from
gs://apache-beam-samples/shakespeare/measureforemeasure.txt
2018/03/30 16:32:26 Reading from
gs://apache-beam-samples/shakespeare/merchantofvenice.txt
2018/03/30 16:32:26 Reading from
gs://apache-beam-samples/shakespeare/merrywivesofwindsor.txt
2018/03/30 16:32:26 Reading from
gs://apache-beam-samples/shakespeare/midsummersnightsdream.txt
2018/03/30 16:32:27 Reading from
gs://apache-beam-samples/shakespeare/muchadoaboutnothing.txt
2018/03/30 16:32:27 Reading from
gs://apache-beam-samples/shakespeare/othello.txt
2018/03/30 16:32:28 Reading from
gs://apache-beam-samples/shakespeare/periclesprinceoftyre.txt
2018/03/30 16:32:28 Reading from
gs://apache-beam-samples/shakespeare/rapeoflucrece.txt
2018/03/30 16:32:29 Reading from
gs://apache-beam-samples/shakespeare/romeoandjuliet.txt
2018/03/30 16:32:29 Reading from
gs://apache-beam-samples/shakespeare/sonnets.txt
2018/03/30 16:32:29 Reading from
gs://apache-beam-samples/shakespeare/tamingoftheshrew.txt
2018/03/30 16:32:30 Reading from
gs://apache-beam-samples/shakespeare/tempest.txt
2018/03/30 16:32:30 Reading from
gs://apache-beam-samples/shakespeare/timonofathens.txt
2018/03/30 16:32:31 Reading from
gs://apache-beam-samples/shakespeare/titusandronicus.txt
2018/03/30 16:32:31 Reading from
gs://apache-beam-samples/shakespeare/troilusandcressida.txt
2018/03/30 16:32:32 Reading from
gs://apache-beam-samples/shakespeare/twelfthnight.txt
2018/03/30 16:32:32 Reading from
gs://apache-beam-samples/shakespeare/twogentlemenofverona.txt
2018/03/30 16:32:32 Reading from
gs://apache-beam-samples/shakespeare/various.txt
2018/03/30 16:32:33 Reading from
gs://apache-beam-samples/shakespeare/venusandadonis.txt
2018/03/30 16:32:33 Reading from
gs://apache-beam-samples/shakespeare/winterstale-personae
2018/03/30 16:32:33 Reading from
gs://apache-beam-samples/shakespeare/winterstale.txt
[{6: KV<string,int>/GW/KV<bytes,int[varintz]>}]
[{10: KV<int,string>/GW/KV<int[varintz],bytes>}]
2018/03/30 16:38:15 Pipeline:
2018/03/30 16:38:15 Nodes: {1: []uint8/GW/bytes}
{2: string/GW/bytes}
{3: string/GW/bytes}
{4: string/GW/bytes}
{5: string/GW/bytes}
{6: KV<string,int>/GW/KV<bytes,int[varintz]>}
{7: CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>}
{8: KV<string,int>/GW/KV<bytes,int[varintz]>}
{9: string/GW/bytes}
{10: KV<int,string>/GW/KV<int[varintz],bytes>}
{11: CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>}
Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/GW/bytes}]
2: ParDo [In(Main): []uint8 <- {1: []uint8/GW/bytes}] -> [Out: T -> {2:
string/GW/bytes}]
3: ParDo [In(Main): string <- {2: string/GW/bytes}] -> [Out: string -> {3:
string/GW/bytes}]
4: ParDo [In(Main): string <- {3: string/GW/bytes}] -> [Out: string -> {4:
string/GW/bytes}]
5: ParDo [In(Main): string <- {4: string/GW/bytes}] -> [Out: string -> {5:
string/GW/bytes}]
6: ParDo [In(Main): string <- {5: string/GW/bytes}] -> [Out: KV<string,int> ->
{6: KV<string,int>/GW/KV<bytes,int[varintz]>}]
7: CoGBK [In(Main): KV<string,int> <- {6:
KV<string,int>/GW/KV<bytes,int[varintz]>}] -> [Out: CoGBK<string,int> -> {7:
CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>}]
8: ParDo [In(Main): CoGBK<string,int> <- {7:
CoGBK<string,int>/GW/CoGBK<bytes,int[varintz]>}] -> [Out: KV<string,int> -> {8:
KV<string,int>/GW/KV<bytes,int[varintz]>}]
9: ParDo [In(Main): KV<string,int> <- {8:
KV<string,int>/GW/KV<bytes,int[varintz]>}] -> [Out: string -> {9:
string/GW/bytes}]
10: ParDo [In(Main): T <- {9: string/GW/bytes}] -> [Out: KV<int,T> -> {10:
KV<int,string>/GW/KV<int[varintz],bytes>}]
11: CoGBK [In(Main): KV<int,string> <- {10:
KV<int,string>/GW/KV<int[varintz],bytes>}] -> [Out: CoGBK<int,string> -> {11:
CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>}]
12: ParDo [In(Main): CoGBK<int,string> <- {11:
CoGBK<int,string>/GW/CoGBK<int[varintz],bytes>}] -> []
2018/03/30 16:36:47 Reading from
gs://apache-beam-samples/shakespeare/kinglear.txt
2018/03/30 16:36:48 Failed to execute job: panic: runtime error: index out of
range goroutine 1 [running]:
runtime/debug.Stack(0xc420121158, 0x18f4340, 0x1fad500)
/usr/local/go/src/runtime/debug/stack.go:24 +0xa7
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc420121c80)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39
+0x6e
panic(0x18f4340, 0x1fad500)
/usr/local/go/src/runtime/panic.go:491 +0x283
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.Invoke(0x1f755a0,
0xc420c87140, 0xc42005e600, 0xc4201214a8, 0x0, 0x0, 0x0, 0x1473223, 0x1f755a0,
0xc420c86fc0)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/fn.go:68
+0xb0f
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).invokeDataFn(0xc4201ce1c0,
0x1f755a0, 0xc420c87140, 0x0, 0x0, 0x0, 0xc42005e600, 0xc4201214a8, 0x1981e60,
0xc4201c6201, ...)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:180
+0x21d
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc4201ce1c0,
0x1f755a0, 0xc420c87140, 0x1878500, 0xc420c870f8, 0x18778c0, 0xc420c87108,
0x0, 0x0, 0x0, ...)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:99
+0x1c4
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc4201ce2a0,
0x1f755a0, 0xc420c86fc0, 0x1878500, 0xc420715120, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:106
+0x274
github.com/apache/beam/sdks/go/pkg/beam/runners/direct.(*CoGBK).FinishBundle(0xc42005e940,
0x1f755a0, 0xc4201c60f0, 0x1013d36, 0xc420121788)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/runners/direct/gbk.go:84
+0x247
github.com/apache/beam/sdks/go/pkg/beam/runners/direct.(*Inject).FinishBundle(0xc4201978a0,
0x1f755a0, 0xc4201c60f0, 0x115fd40, 0xc420121840)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/runners/direct/gbk.go:125
+0x48
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f755a0,
0xc4201c60f0, 0xc4201839c0, 0x1, 0x1, 0x0, 0x0)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58
+0x75
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc4201ce380,
0x1f755a0, 0xc4201c60f0, 0x1489b27, 0xc42005e240)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120
+0x13c
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f755a0,
0xc4201c60f0, 0xc4201839d0, 0x1, 0x1, 0x0, 0x0)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58
+0x75
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc4201ce460,
0x1f755a0, 0xc4201c60f0, 0x22b7558, 0x1fc15a0)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120
+0x13c
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f755a0,
0xc4201c60f0, 0xc4201839e0, 0x1, 0x1, 0x0, 0x0)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58
+0x75
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc4201ce540,
0x1f755a0, 0xc4201c60f0, 0xc420121b30, 0x148e774)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120
+0x13c
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f755a0,
0xc4201c60f0, 0xc4201839f0, 0x1, 0x1, 0x0, 0x0)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58
+0x75
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc4201ce620,
0x1f755a0, 0xc4201c60f0, 0x1054cd0, 0xc420032800)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120
+0x13c
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f755a0,
0xc4201c60f0, 0xc420183a00, 0x1, 0x1, 0x0, 0x0)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58
+0x75
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc4201ce700,
0x1f755a0, 0xc4201c60f0, 0xc420121c28, 0x102afb9)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120
+0x13c
github.com/apache/beam/sdks/go/pkg/beam/runners/direct.(*Impulse).FinishBundle(0xc4201c60c0,
0x1f755a0, 0xc4201c60f0, 0xc420121c40, 0xc420088080)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/runners/direct/impulse.go:52
+0x48
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Root).FinishBundle-fm(0x1f755a0,
0xc4201c60f0, 0xc420121c80, 0x0)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:118
+0x43
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0x1f755a0,
0xc4201c60f0, 0xc420121cd8, 0x0, 0x0)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42
+0x76
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc42019a460,
0x1f755a0, 0xc4201c60f0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:118
+0x49f
github.com/apache/beam/sdks/go/pkg/beam/runners/direct.Execute(0x1f75520,
0xc420086008, 0xc420184020, 0x6, 0xc4201591a0)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/runners/direct/direct.go:49
+0x2da
github.com/apache/beam/sdks/go/pkg/beam.Run(0x1f75520, 0xc420086008, 0x19f37cb,
0x6, 0xc420184020, 0xc42017fbc0, 0xc42017f560)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/runner.go:50
+0x8d
github.com/apache/beam/sdks/go/pkg/beam/x/beamx.Run(0x1f75520, 0xc420086008,
0xc420184020, 0xf, 0xc42017f830)
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/pkg/beam/x/beamx/run.go:42
+0x57
main.main()
/Users/pwg/tokentransit/src/gopath/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go:191
+0x215
exit status 1
minimal_wordcount.go
Description: Binary data
wordcount.go
Description: Binary data
