cbix opened a new issue, #25154:
URL: https://github.com/apache/beam/issues/25154

   ### What happened?
   
   With the Apache Beam Go SDK 2.44.0, having a stateful DoFn in your pipeline 
causes a panic at runtime with the go direct runner:
   ```
   panic: Failed to execute job: panic: runtime error: invalid memory address 
or nil pointer dereference
   ```
   <details><summary><b>Full trace</b></summary><pre>
   panic: Failed to execute job: panic: runtime error: invalid memory address 
or nil pointer dereference
   Full error:
   while executing Process for Plan[plan]:
   5: Impulse[0]
   1: ParDo[main.logFn] Out:[] Sig: func(context.Context, string)
   2: ParDo[main.valueStateFn] Out:[1] Sig: func(state.Provider, string, int) 
string
   3: ParDo[main.main.func1] Out:[2] Sig: func(string, func(string, int))
   4: ParDo[beam.createFn] Out:[3] Sig: func([]uint8, func(typex.T)) error
        caused by:
   panic: runtime error: invalid memory address or nil pointer dereference 
goroutine 1 [running]:
   runtime/debug.Stack()
        /usr/lib/go/src/runtime/debug/stack.go:24 +0x65
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic.func1()
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/util.go:58
 +0xa5
   panic({0xece0c0, 0x18b11f0})
        /usr/lib/go/src/runtime/panic.go:884 +0x212
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).Invoke(0xc000554140,
 {0x11afcf0?, 0xc000125900}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, 
...)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn.go:196
 +0x519
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc0001a0700,
 {0x11afcf0, 0xc000125900}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, 
...)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:359
 +0x25e
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc0001a0700,
 0xc000162480)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:178
 +0x9b
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x10?,
 0x11afcf0?)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:156
 +0x65
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc0001a0700,
 {0x5?, 0xc00056bf90?}, 0xc0000f2dd8, {0x0, 0x0, 0x0})
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:141
 +0x125
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec/optimized.(*emitNative).invokeStringInt(0xc0000f2d80,
 {0xc0004908e0?, 0x4?}, 0xc0003de488?)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/optimized/emitters.go:2681
 +0x1bc
   main.main.func1({0xc0004908e0?, 0x0?}, 0x0?)
        /home/florian/enlyze/beam/state.go:45 +0x26
   reflect.Value.call({0xe6e3a0?, 0x1092b08?, 0x450cf2?}, {0x102eaf6, 0x4}, 
{0xc0001618f0, 0x2, 0x50ec91?})
        /usr/lib/go/src/reflect/value.go:584 +0x8c5
   reflect.Value.Call({0xe6e3a0?, 0x1092b08?, 0x10000004efe25?}, 
{0xc0001618f0?, 0x0?, 0x7fb4423aa5b8?})
        /usr/lib/go/src/reflect/value.go:368 +0xbc
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx.(*reflectFunc).Call(0xc0006048b8,
 {0xc00061cc80?, 0x98?, 0x18d1aa0?})
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/util/reflectx/call.go:87
 +0x59
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).initCall.func46({0x0,
 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, 0x1}, 0x0?)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn_arity.go:307
 +0x94
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).Invoke(0xc000554280,
 {0x11afcf0?, 0xc0001258c0}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, 
...)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn.go:252
 +0xe23
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc0001a0800,
 {0x11afcf0, 0xc0001258c0}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, 
...)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:359
 +0x25e
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc0001a0800,
 0xc0001623f0)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:178
 +0x9b
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x18d1aa0?,
 0x11afcf0?)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:156
 +0x65
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc0001a0800,
 {0xc0001618c0?, 0xe4a860?}, 0xc0000f2e98, {0x0, 0x0, 0x0})
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:141
 +0x125
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec/optimized.(*emitNative).invokeTypex_T(0xc0000f2e40,
 {0xe4a860?, 0xc000040c20?})
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/optimized/emitters.go:12121
 +0x18a
   
github.com/apache/beam/sdks/v2/go/pkg/beam.(*createFn).ProcessElement(0xc0001608d0,
 {0x0?, 0x8?, 0x40f35f?}, 0xc000040bf0)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/create.go:121
 +0x105
   
github.com/apache/beam/sdks/v2/go/pkg/beam.wrapMakerCreateFn.func1({0x19016d0?, 
0x1000000000000?, 0x7fb4423b4b28?}, 0x0?)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/beam.shims.go:70
 +0x31
   
github.com/apache/beam/sdks/v2/go/pkg/beam.(*callerSliceOfByteEmitTypex۰TГError).Call2x1(0x203000?,
 {0xe34060?, 0xc000604c30?}, {0xe47960?, 0xc000040bf0?})
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/beam.shims.go:149
 +0x5e
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).initCall.func12({0x0,
 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, 0x1}, 0x0?)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn_arity.go:103
 +0x9b
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).Invoke(0xc0005543c0,
 {0x11afcf0?, 0xc000125880}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, 
...)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn.go:252
 +0xe23
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc0001a0900,
 {0x11afcf0, 0xc000125880}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...}, 
...)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:359
 +0x25e
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc0001a0900,
 0xc000162360)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:178
 +0x9b
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x18?,
 0x11afcf0?)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:156
 +0x65
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc0001a0900,
 {0x0?, 0x0?}, 0xc0000e09a0, {0x0, 0x0, 0x0})
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:141
 +0x125
   
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct.(*Impulse).Process(0xc000161800,
 {0x11afcf0, 0xc0001256c0})
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/runners/direct/impulse.go:52
 +0xee
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic({0x11afcf0?,
 0xc0001256c0?}, 0xc0003dfb10?)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/util.go:62
 +0x6c
   
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc0000e0930,
 {0x11afcf0, 0xc0001256c0}, {0x0, 0x0}, {{0x0?, 0x0?}, {0x0?, 0x0?}})
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/plan.go:129
 +0x394
   
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct.Execute({0x11afb68?, 
0xc000046128?}, 0xc000600218)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/runners/direct/direct.go:71
 +0x89f
   github.com/apache/beam/sdks/v2/go/pkg/beam.Run({0x11afb68, 0xc000046128}, 
{0x10308b4, 0x6}, 0x1?)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/runner.go:50
 +0xc2
   github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx.Run({0x11afb68?, 
0xc000046128?}, 0xe5e920?)
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/x/beamx/run.go:57
 +0x4d
   main.main()
        /home/florian/enlyze/beam/state.go:51 +0x2ec
   
   
   goroutine 1 [running]:
   github.com/apache/beam/sdks/v2/go/pkg/beam/log.Fatalf({0x11afb68, 
0xc000046128}, {0x1042358?, 0x1092b00?}, {0xc0003dff00?, 0x0?, 0x0?})
        
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/log/log.go:153
 +0xa5
   main.main()
        /home/florian/enlyze/beam/state.go:52 +0x33e
   exit status 2
   </pre></details>
   
   This leaves me wondering if state is even supported at all in the go direct 
runner.
   
   The code to reproduce this is based on the [`primitives/state` integration 
test 
`ValueStateParDo`](https://github.com/apache/beam/blob/a96afe2c57c45a869a622086eaa4f81305f06e72/sdks/go/test/integration/primitives/state.go#L49-L93):
   ```go
   package main
   
   import (
        "context"
        "flag"
        "fmt"
   
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
   )
   
   func logFn(ctx context.Context, l string) {
        log.Infoln(ctx, l)
   }
   
   type valueStateFn struct {
        State1 state.Value[int]
   }
   
   func (f *valueStateFn) ProcessElement(s state.Provider, w string, c int) 
string {
        i, ok, err := f.State1.Read(s)
        if err != nil {
                panic(err)
        }
        if !ok {
                i = 1
        }
        err = f.State1.Write(s, i+1)
        if err != nil {
                panic(err)
        }
        return fmt.Sprintf("%s: %v", w, i)
   }
   
   func main() {
        flag.Parse()
        beam.Init()
   
        p, s := beam.NewPipelineWithRoot()
   
        in := beam.Create(s, "apple", "pear", "peach", "apple", "apple", "pear")
        keyed := beam.ParDo(s, func(w string, emit func(string, int)) {
                emit(w, 1)
        }, in)
        counts := beam.ParDo(s, &valueStateFn{}, keyed)
        beam.ParDo0(s, logFn, counts)
   
        ctx := context.Background()
        if err := beamx.Run(ctx, p); err != nil {
                log.Fatalf(ctx, "Failed to execute job: %v", err)
        }
   }
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [X] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to