[
https://issues.apache.org/jira/browse/BEAM-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16550056#comment-16550056
]
Bill Neubauer commented on BEAM-4832:
-------------------------------------
The comments for the mutex should indicate that it protects the readers and
writers maps. That will help future reviewers maintain the locking invariants.
> Concurrent Writes in Data channels
> ----------------------------------
>
> Key: BEAM-4832
> URL: https://issues.apache.org/jira/browse/BEAM-4832
> Project: Beam
> Issue Type: Bug
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Robert Burke
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> A user was having issue with streaming Go pipelines on Dataflow.
> Investigation yeilded the panic below, which triggered concurrent
> modifications of the data channel maps.
>
> The fix is properly guarding all writes to the Data channel maps, in
> particular:
>
> func
> ([c|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&gs=kythe%253A%252F%252Fgoogle3%253Flang%253Dgo%253Fpath%253Dthird_party%252Fgolang%252Fapache_beam%252Fpkg%252Fbeam%252Fcore%252Fruntime%252Fharness%252Fharness%2523param%252520DataChannel.removeReader%25253Ac&gsn=c&ct=xref_usages]
>
> *[DataChannel|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=87&ct=xref_jump_to_def&gsn=DataChannel&rcl=205012539])
>
> [removeReader|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&gs=kythe%253A%252F%252Fgoogle3%253Flang%253Dgo%253Fpath%253Dthird_party%252Fgolang%252Fapache_beam%252Fpkg%252Fbeam%252Fcore%252Fruntime%252Fharness%252Fharness%2523method%252520DataChannel.removeReader&gsn=removeReader&ct=xref_usages]([id|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&gs=kythe%253A%252F%252Fgoogle3%253Flang%253Dgo%253Fpath%253Dthird_party%252Fgolang%252Fapache_beam%252Fpkg%252Fbeam%252Fcore%252Fruntime%252Fharness%252Fharness%2523param%252520DataChannel.removeReader%25253Aid&gsn=id&ct=xref_usages]
> string) {
> delete([c|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&ct=xref_jump_to_def&gsn=c&rcl=205012539].[readers|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=93&ct=xref_jump_to_def&gsn=readers&rcl=205012539],
>
> [id|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&ct=xref_jump_to_def&gsn=id&rcl=205012539])}
> Should be
>
> func
> ([c|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&gs=kythe%253A%252F%252Fgoogle3%253Flang%253Dgo%253Fpath%253Dthird_party%252Fgolang%252Fapache_beam%252Fpkg%252Fbeam%252Fcore%252Fruntime%252Fharness%252Fharness%2523param%252520DataChannel.removeReader%25253Ac&gsn=c&ct=xref_usages]
>
> *[DataChannel|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=87&ct=xref_jump_to_def&gsn=DataChannel&rcl=205012539])
>
> [removeReader|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&gs=kythe%253A%252F%252Fgoogle3%253Flang%253Dgo%253Fpath%253Dthird_party%252Fgolang%252Fapache_beam%252Fpkg%252Fbeam%252Fcore%252Fruntime%252Fharness%252Fharness%2523method%252520DataChannel.removeReader&gsn=removeReader&ct=xref_usages]([id|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&gs=kythe%253A%252F%252Fgoogle3%253Flang%253Dgo%253Fpath%253Dthird_party%252Fgolang%252Fapache_beam%252Fpkg%252Fbeam%252Fcore%252Fruntime%252Fharness%252Fharness%2523param%252520DataChannel.removeReader%25253Aid&gsn=id&ct=xref_usages]
> string) {
> [c|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=198&ct=xref_jump_to_def&gsn=c&rcl=205012539].[mu|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=96&ct=xref_jump_to_def&gsn=mu&rcl=205012539].[Lock|https://cs.corp.google.com/piper///depot/google3/third_party/go/gc/src/sync/mutex.go?l=72&ct=xref_jump_to_def&gsn=Lock&rcl=205012539]()
>
> delete([c|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&ct=xref_jump_to_def&gsn=c&rcl=205012539].[readers|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=93&ct=xref_jump_to_def&gsn=readers&rcl=205012539],
>
> [id|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&ct=xref_jump_to_def&gsn=id&rcl=205012539])
>
> [c|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=198&ct=xref_jump_to_def&gsn=c&rcl=205012539].[mu|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=96&ct=xref_jump_to_def&gsn=mu&rcl=205012539].[Unlock|https://cs.corp.google.com/piper///depot/google3/third_party/go/gc/src/sync/mutex.go?l=175&ct=xref_jump_to_def&gsn=Unlock&rcl=205012539]()
> }
>
>
> I fatal error: concurrent map writes
> I
> I goroutine 3277 [running]:
> I runtime.throw(0xf880d0, 0x15)
> I GOROOT/src/runtime/panic.go:616 +0x81 fp=0xc4212eb6d8 sp=0xc4212eb6b8
> pc=0x42be31
> I runtime.mapdelete_faststr(0xe18160, 0xc4202ba7b0, 0xc4213dc0e0, 0x20)
> I GOROOT/src/runtime/hashmap_fast.go:892 +0x28d fp=0xc4212eb738
> sp=0xc4212eb6d8 pc=0x40e45d
> I
> google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*DataChannel).removeReader(...)
>
> I
> vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/datamgr.go:213
>
> I
> google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*dataReader).Close(0xc420ba8c80,
> 0xc42031e0c0, 0xf)
> I
> vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/datamgr.go:241
> +0x6e fp=0xc4212eb768 sp=0xc4212eb738 pc=0xc5059e
> I
> google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*DataSource).Process(0xc4203ae540,
> 0x10708c0, 0xc420464e70, 0x0, 0x0)
> I
> vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/datasource.go:156
> +0x1302 fp=0xc4212ebbd8 sp=0xc4212eb768 pc=0x894612
> I
> google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Root).Process-fm(0x10708c0,
> 0xc420464e70, 0xc420b7bc58, 0x0)
> I
> vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:112
> +0x43 fp=0xc4212ebc10 sp=0xc4212ebbd8 pc=0x8a4c23
> I
> google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0x10708c0,
> 0xc420464e70, 0xc420b7bcc8, 0x0, 0x0)
> I
> vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42
> +0x6c fp=0xc4212ebc40 sp=0xc4212ebc10 pc=0x8a37ac
> I
> google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc420030770,
> 0x10708c0, 0xc420464e70, 0xc4201a85e8, 0x5, 0x1064720, 0xc4202097e0,
> 0xfb5320, 0xc4206ff940)
> I
> vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:112
> +0x3bd fp=0xc4212ebd58 sp=0xc4212ebc40 pc=0x89d4ed
> I
> google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc4201a7820,
> 0x10708c0, 0xc420464e10, 0xc420d5e340, 0xc420038018)
> I
> vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:193
> +0x729 fp=0xc4212ebf40 sp=0xc4212ebd58 pc=0xc51ef9
> I
> google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0x1070840,
> 0xc420038018, 0xc420d5e340)
> I
> vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:113
> +0x167 fp=0xc4212ebfc8 sp=0xc4212ebf40 pc=0xc54b17
> I runtime.goexit()
> I
> bazel-out/k8-fastbuild/bin/external/io_bazel_rules_go/linux_amd64_stripped/stdlib~/src/runtime/asm_amd64.s:2361
> +0x1 fp=0xc4212ebfd0 sp=0xc4212ebfc8 pc=0x45a591
> I created by
> google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main
>
> I
> vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:126
> +0x5cc
> I
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)