Henning Rohde created BEAM-3978:
-----------------------------------

             Summary: Direct output doesn't work after GBK
                 Key: BEAM-3978
                 URL: https://issues.apache.org/jira/browse/BEAM-3978
             Project: Beam
          Issue Type: Bug
          Components: sdk-go
            Reporter: Henning Rohde


>From the dev list: 
>https://lists.apache.org/thread.html/9cbe8449e4036aca522362dbb9909a535c30479fcd8df193cb01acbd@%3Cdev.beam.apache.org%3E

Snippet:

        // Concept #3: Invoke the stats.Count transform on our PCollection of
        // individual words. The Count transform returns a new PCollection of
        // key/value pairs, where each key represents a unique word in the text.
        // The associated value is the occurrence count for that word.
        singles := beam.ParDo(s, func(word string) (string, int) {
                return word, 1
        }, words)

        grouped := beam.GroupByKey(s, singles)

        counted := beam.ParDo(s, func(word string, values func(*int) bool) 
(string, int) {
                sum := 0
                for {
                        var i int
                        if values(&i) {
                                sum = sum + i
                        } else {
                                break
                        }
                }
                return word, sum
        }, grouped)

Produces:

2018/03/30 18:40:56 Failed to execute job: panic: runtime error: index out of 
range goroutine 1 [running]:
runtime/debug.Stack(0xc4207cd158, 0x18dd200, 0x1f9fe80)
        /usr/local/go/src/runtime/debug/stack.go:24 +0xa7
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc4207cdc80)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39
 +0x6e
panic(0x18dd200, 0x1f9fe80)
        /usr/local/go/src/runtime/panic.go:491 +0x283
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.Invoke(0x1f681a0, 
0xc4202b2d80, 0xc4200551c0, 0xc4207cd4a8, 0x0, 0x0, 0x0, 0x1471113, 0x1f681a0, 
0xc4202b2ba0)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/fn.go:68
 +0xb0f
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).invokeDataFn(0xc4206861c0,
 0x1f681a0, 0xc4202b2d80, 0x0, 0x0, 0x0, 0xc4200551c0, 0xc4207cd4a8, 0x19672a0, 
0x1057801, ...)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:180
 +0x21d
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc4206861c0,
 0x1f681a0, 0xc4202b2d80, 0x1861d00, 0xc420979be8, 0x18610c0, 0xc420979bf8, 
0x0, 0x0, 0x0, ...)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:99
 +0x1c4
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc4206862a0,
 0x1f681a0, 0xc4202b2ba0, 0x1861d00, 0xc420691600, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:106
 +0x274
github.com/apache/beam/sdks/go/pkg/beam/runners/direct.(*CoGBK).FinishBundle(0xc420055580,
 0x1f681a0, 0xc42065cc60, 0x0, 0xc4207cd7d8)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/runners/direct/gbk.go:84
 +0x247
github.com/apache/beam/sdks/go/pkg/beam/runners/direct.(*Inject).FinishBundle(0xc4206722a0,
 0x1f681a0, 0xc42065cc60, 0x115efa0, 0xc4207cd840)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/runners/direct/gbk.go:125
 +0x48
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f681a0,
 0xc42065cc60, 0xc420661030, 0x1, 0x1, 0x0, 0x0)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58
 +0x75
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc420686380,
 0x1f681a0, 0xc42065cc60, 0x1487a17, 0xc420054e00)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120
 +0x13c
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f681a0,
 0xc42065cc60, 0xc420661040, 0x1, 0x1, 0x0, 0x0)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58
 +0x75
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc420686460,
 0x1f681a0, 0xc42065cc60, 0x260ea80, 0x1fb3860)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120
 +0x13c
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f681a0,
 0xc42065cc60, 0xc420661050, 0x1, 0x1, 0x0, 0x0)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58
 +0x75
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc420686540,
 0x1f681a0, 0xc42065cc60, 0xc4207cdb30, 0x148c664)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120
 +0x13c
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f681a0,
 0xc42065cc60, 0xc420661060, 0x1, 0x1, 0x0, 0x0)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58
 +0x75
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc420686620,
 0x1f681a0, 0xc42065cc60, 0x0, 0x0)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120
 +0x13c
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.MultiFinishBundle(0x1f681a0,
 0xc42065cc60, 0xc420661070, 0x1, 0x1, 0x0, 0x0)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58
 +0x75
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*ParDo).FinishBundle(0xc420686700,
 0x1f681a0, 0xc42065cc60, 0xc4207cdc28, 0x102ab59)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/pardo.go:120
 +0x13c
github.com/apache/beam/sdks/go/pkg/beam/runners/direct.(*Impulse).FinishBundle(0xc42065cc30,
 0x1f681a0, 0xc42065cc60, 0xc4207cdc40, 0xc4207cdc38)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/runners/direct/impulse.go:52
 +0x48
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Root).FinishBundle-fm(0x1f681a0,
 0xc42065cc60, 0xc4207cdc80, 0x0)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:118
 +0x43
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0x1f681a0,
 0xc42065cc60, 0xc4207cdcd8, 0x0, 0x0)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42
 +0x76
github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc420648a80,
 0x1f681a0, 0xc42065cc60, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:118
 +0x49f
github.com/apache/beam/sdks/go/pkg/beam/runners/direct.Execute(0x1f68120, 
0xc420012078, 0xc42000e0a0, 0x6, 0xc42063ab20)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/runners/direct/direct.go:49
 +0x2da
github.com/apache/beam/sdks/go/pkg/beam.Run(0x1f68120, 0xc420012078, 0x19d5e25, 
0x6, 0xc42000e0a0, 0xc42065c750, 0xc42065c0f0)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/runner.go:50 +0x8d
github.com/apache/beam/sdks/go/pkg/beam/x/beamx.Run(0x1f68120, 0xc420012078, 
0xc42000e0a0, 0xc, 0xc42065c3c0)
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/pkg/beam/x/beamx/run.go:42 
+0x57
main.main()
        
/Users/herohde/go/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go:194
 +0x215

The problem seems to be an indexing bug in the data Fn wrapper. Using an 
emitter for the output after GBK makes this failure not occur.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to