cbix opened a new issue, #25154:
URL: https://github.com/apache/beam/issues/25154
### What happened?
With the Apache Beam Go SDK 2.44.0, having a stateful DoFn in your pipeline
causes a panic at runtime with the go direct runner:
```
panic: Failed to execute job: panic: runtime error: invalid memory address
or nil pointer dereference
```
<details><summary><b>Full trace</b></summary><pre>
panic: Failed to execute job: panic: runtime error: invalid memory address
or nil pointer dereference
Full error:
while executing Process for Plan[plan]:
5: Impulse[0]
1: ParDo[main.logFn] Out:[] Sig: func(context.Context, string)
2: ParDo[main.valueStateFn] Out:[1] Sig: func(state.Provider, string, int)
string
3: ParDo[main.main.func1] Out:[2] Sig: func(string, func(string, int))
4: ParDo[beam.createFn] Out:[3] Sig: func([]uint8, func(typex.T)) error
caused by:
panic: runtime error: invalid memory address or nil pointer dereference
goroutine 1 [running]:
runtime/debug.Stack()
/usr/lib/go/src/runtime/debug/stack.go:24 +0x65
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic.func1()
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/util.go:58
+0xa5
panic({0xece0c0, 0x18b11f0})
/usr/lib/go/src/runtime/panic.go:884 +0x212
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).Invoke(0xc000554140,
{0x11afcf0?, 0xc000125900}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...},
...)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn.go:196
+0x519
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc0001a0700,
{0x11afcf0, 0xc000125900}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...},
...)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:359
+0x25e
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc0001a0700,
0xc000162480)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:178
+0x9b
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x10?,
0x11afcf0?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:156
+0x65
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc0001a0700,
{0x5?, 0xc00056bf90?}, 0xc0000f2dd8, {0x0, 0x0, 0x0})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:141
+0x125
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec/optimized.(*emitNative).invokeStringInt(0xc0000f2d80,
{0xc0004908e0?, 0x4?}, 0xc0003de488?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/optimized/emitters.go:2681
+0x1bc
main.main.func1({0xc0004908e0?, 0x0?}, 0x0?)
/home/florian/enlyze/beam/state.go:45 +0x26
reflect.Value.call({0xe6e3a0?, 0x1092b08?, 0x450cf2?}, {0x102eaf6, 0x4},
{0xc0001618f0, 0x2, 0x50ec91?})
/usr/lib/go/src/reflect/value.go:584 +0x8c5
reflect.Value.Call({0xe6e3a0?, 0x1092b08?, 0x10000004efe25?},
{0xc0001618f0?, 0x0?, 0x7fb4423aa5b8?})
/usr/lib/go/src/reflect/value.go:368 +0xbc
github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx.(*reflectFunc).Call(0xc0006048b8,
{0xc00061cc80?, 0x98?, 0x18d1aa0?})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/util/reflectx/call.go:87
+0x59
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).initCall.func46({0x0,
0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, 0x1}, 0x0?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn_arity.go:307
+0x94
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).Invoke(0xc000554280,
{0x11afcf0?, 0xc0001258c0}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...},
...)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn.go:252
+0xe23
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc0001a0800,
{0x11afcf0, 0xc0001258c0}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...},
...)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:359
+0x25e
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc0001a0800,
0xc0001623f0)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:178
+0x9b
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x18d1aa0?,
0x11afcf0?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:156
+0x65
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc0001a0800,
{0xc0001618c0?, 0xe4a860?}, 0xc0000f2e98, {0x0, 0x0, 0x0})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:141
+0x125
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec/optimized.(*emitNative).invokeTypex_T(0xc0000f2e40,
{0xe4a860?, 0xc000040c20?})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/optimized/emitters.go:12121
+0x18a
github.com/apache/beam/sdks/v2/go/pkg/beam.(*createFn).ProcessElement(0xc0001608d0,
{0x0?, 0x8?, 0x40f35f?}, 0xc000040bf0)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/create.go:121
+0x105
github.com/apache/beam/sdks/v2/go/pkg/beam.wrapMakerCreateFn.func1({0x19016d0?,
0x1000000000000?, 0x7fb4423b4b28?}, 0x0?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/beam.shims.go:70
+0x31
github.com/apache/beam/sdks/v2/go/pkg/beam.(*callerSliceOfByteEmitTypex۰TГError).Call2x1(0x203000?,
{0xe34060?, 0xc000604c30?}, {0xe47960?, 0xc000040bf0?})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/beam.shims.go:149
+0x5e
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).initCall.func12({0x0,
0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, 0x1}, 0x0?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn_arity.go:103
+0x9b
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*invoker).Invoke(0xc0005543c0,
{0x11afcf0?, 0xc000125880}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...},
...)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/fn.go:252
+0xe23
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).invokeProcessFn(0xc0001a0900,
{0x11afcf0, 0xc000125880}, {0x0, 0x0, 0x0, 0x0, 0x0}, {0x18b0cb0, 0x1, ...},
...)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:359
+0x25e
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processSingleWindow(0xc0001a0900,
0xc000162360)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:178
+0x9b
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).processMainInput(0x18?,
0x11afcf0?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:156
+0x65
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*ParDo).ProcessElement(0xc0001a0900,
{0x0?, 0x0?}, 0xc0000e09a0, {0x0, 0x0, 0x0})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/pardo.go:141
+0x125
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct.(*Impulse).Process(0xc000161800,
{0x11afcf0, 0xc0001256c0})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/runners/direct/impulse.go:52
+0xee
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic({0x11afcf0?,
0xc0001256c0?}, 0xc0003dfb10?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/util.go:62
+0x6c
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc0000e0930,
{0x11afcf0, 0xc0001256c0}, {0x0, 0x0}, {{0x0?, 0x0?}, {0x0?, 0x0?}})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/core/runtime/exec/plan.go:129
+0x394
github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct.Execute({0x11afb68?,
0xc000046128?}, 0xc000600218)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/runners/direct/direct.go:71
+0x89f
github.com/apache/beam/sdks/v2/go/pkg/beam.Run({0x11afb68, 0xc000046128},
{0x10308b4, 0x6}, 0x1?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/runner.go:50
+0xc2
github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx.Run({0x11afb68?,
0xc000046128?}, 0xe5e920?)
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/x/beamx/run.go:57
+0x4d
main.main()
/home/florian/enlyze/beam/state.go:51 +0x2ec
goroutine 1 [running]:
github.com/apache/beam/sdks/v2/go/pkg/beam/log.Fatalf({0x11afb68,
0xc000046128}, {0x1042358?, 0x1092b00?}, {0xc0003dff00?, 0x0?, 0x0?})
/home/florian/go/pkg/mod/github.com/apache/beam/sdks/[email protected]/go/pkg/beam/log/log.go:153
+0xa5
main.main()
/home/florian/enlyze/beam/state.go:52 +0x33e
exit status 2
</pre></details>
This leaves me wondering if state is even supported at all in the go direct
runner.
The code to reproduce this is based on the [`primitives/state` integration
test
`ValueStateParDo`](https://github.com/apache/beam/blob/a96afe2c57c45a869a622086eaa4f81305f06e72/sdks/go/test/integration/primitives/state.go#L49-L93):
```go
package main
import (
"context"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func logFn(ctx context.Context, l string) {
log.Infoln(ctx, l)
}
type valueStateFn struct {
State1 state.Value[int]
}
func (f *valueStateFn) ProcessElement(s state.Provider, w string, c int)
string {
i, ok, err := f.State1.Read(s)
if err != nil {
panic(err)
}
if !ok {
i = 1
}
err = f.State1.Write(s, i+1)
if err != nil {
panic(err)
}
return fmt.Sprintf("%s: %v", w, i)
}
func main() {
flag.Parse()
beam.Init()
p, s := beam.NewPipelineWithRoot()
in := beam.Create(s, "apple", "pear", "peach", "apple", "apple", "pear")
keyed := beam.ParDo(s, func(w string, emit func(string, int)) {
emit(w, 1)
}, in)
counts := beam.ParDo(s, &valueStateFn{}, keyed)
beam.ParDo0(s, logFn, counts)
ctx := context.Background()
if err := beamx.Run(ctx, p); err != nil {
log.Fatalf(ctx, "Failed to execute job: %v", err)
}
}
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [ ] Component: Java SDK
- [X] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]