lostluck commented on a change in pull request #15657:
URL: https://github.com/apache/beam/pull/15657#discussion_r731158201
##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -189,6 +197,8 @@ func (t kind) String() string {
return "Distribution"
case kindGauge:
return "Gauge"
+ case kindMsec:
+ return "kindMsec"
Review comment:
```suggestion
return "DoFnMsec"
```
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -145,16 +152,83 @@ type ptCounterSet struct {
gauges map[nameHash]*gauge
}
+// Bundle processing state (START_BUNDLE, PROCESS_BUNDLE, FINISH_BUNDLE)
+type bundleProcState int
+
+const (
+ StartBundle bundleProcState = 0
+ ProcessBundle bundleProcState = 1
+ FinishBundle bundleProcState = 2
+ TotalBundle bundleProcState = 3
+)
+
+// ExecutionState stores the information about a bundle in a particular state.
+type ExecutionState struct {
+ State bundleProcState
+ IsProcessing bool // set to true when sent as a response to
ProcessBundleProgress Request
+ TotalTime time.Duration
+}
+
+// executionTracker stores information about a PTransform for execution time
metrics.
+type executionTracker struct {
+ numberOfTransitions int64
+ millisSinceLastTransition time.Duration
+ transitionsAtLastSample int64
+ currentState bundleProcState
+ pid string
+}
+
+func SetPTransformState(ctx context.Context, state bundleProcState) {
+ if bctx, ok := ctx.(*beamCtx); ok {
+ pid := bctx.ptransformID
+ bctx.store.mu.Lock()
+ bctx.store.executionStore.pid = pid
+ bctx.store.executionStore.currentState = state
+ bctx.store.mu.Unlock()
+ atomic.AddInt64(&bctx.store.executionStore.numberOfTransitions,
1)
+ }
+}
+
+type CurrentStateVal struct {
+ pid string
+ state bundleProcState
+ transitions int64
+}
+
+func loadCurrentState(ctx context.Context) CurrentStateVal {
+ if bctx, ok := ctx.(*beamCtx); ok {
+ bctx.store.mu.Lock()
+ pid := bctx.store.executionStore.pid
+
+ state := bctx.store.executionStore.currentState
+ bctx.store.mu.Unlock()
+ return CurrentStateVal{pid: pid, state: state, transitions:
atomic.LoadInt64(&bctx.store.executionStore.numberOfTransitions)}
+ }
+ panic("execution store not yet set.")
+}
+
// Store retains per transform countersets, intended for per bundle use.
type Store struct {
mu sync.RWMutex
css []*ptCounterSet
- store map[Labels]userMetric
+ store map[Labels]userMetric
+ executionStore *executionTracker
+
+ stateRegistry map[string][4]*ExecutionState
}
func newStore() *Store {
- return &Store{store: make(map[Labels]userMetric)}
+ return &Store{store: make(map[Labels]userMetric), executionStore:
&executionTracker{}, stateRegistry: make(map[string][4]*ExecutionState)}
+}
+
+// GetRegistry return the state registry.
+func (s *Store) GetRegistry() map[string][4]*ExecutionState {
+ return s.stateRegistry
+}
+
+func (s *Store) GetExecutionStore() *executionTracker {
Review comment:
We never call this, please remove it.
##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -30,60 +30,24 @@ func NewSampler(ctx context.Context, store *Store)
StateSampler {
return StateSampler{store: store}
}
-func initialize() [4]*ExecutionState {
- var v [4]*ExecutionState
- for i := 0; i < 4; i++ {
- v[i] = &ExecutionState{}
- }
- return v
-}
-
func (s *StateSampler) Sample(ctx context.Context, t time.Duration) {
- ps := loadPTransformState(ctx)
- pid := PTransformLabels(ps.pid)
-
+ ps := loadCurrentState(ctx)
s.store.mu.Lock()
defer s.store.mu.Unlock()
Review comment:
Note that we're OK with using the lock here, because the StateSampler
will acquire the lock relatively infrequenly, once every ~200ms, and not hold
it for very long at all. The SetPTransformState call will can be called
millions of times in that interval, making it a very contentious lock, and why
we must avoid grabbing the lock in that call.
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -214,20 +213,21 @@ type Store struct {
css []*ptCounterSet
store map[Labels]userMetric
- executionStore map[Labels]*executionTracker
- stateRegistry map[Labels][4]*ExecutionState
+ executionStore *executionTracker
+
+ stateRegistry map[string][4]*ExecutionState
}
func newStore() *Store {
- return &Store{store: make(map[Labels]userMetric), executionStore:
make(map[Labels]*executionTracker), stateRegistry:
make(map[Labels][4]*ExecutionState)}
+ return &Store{store: make(map[Labels]userMetric), executionStore:
&executionTracker{}, stateRegistry: make(map[string][4]*ExecutionState)}
}
// GetRegistry return the state registry.
-func (s *Store) GetRegistry() map[Labels][4]*ExecutionState {
+func (s *Store) GetRegistry() map[string][4]*ExecutionState {
Review comment:
Since this exported method is only used in the Test, can we get rid of
it?
##########
File path: sdks/go/pkg/beam/core/metrics/sampler_test.go
##########
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+ "context"
+ "testing"
+ "time"
+)
+
+func TestSampler(t *testing.T) {
Review comment:
Time to add a benchmark or three! You can read a little bit about
writing micro-benchmarks in Go, in the testing package doc:
https://pkg.go.dev/testing#hdr-Benchmarks They're basically special kinds of
test, that start with `func Benchmark` instead of `func Test`. A longer
description is over here:
https://dev.to/mcaci/introduction-to-benchmarks-in-go-3cii
We should have 3 benchmarks for now. One just for the SetPTransformState
operation, one for the Sampler's Sample operation, and one for using them both
at the same time (yes, you'll finally get to use `go` in a test.).
For those first two:
```
1. Initialize Bundle and PTransform contexts.
2. Repeatedly call the function under test until the benchmark exits.
```
For the combo benchmark:
```
1. Initialize the bundle contexts.
2. Start a sampler goroutine with the bundle context (very similar to how it
works in the harness package).
- This goroutine loop and sleep, but every 5ms, instead of 200ms.
3. Initialize 2 different PTransform contexts from the bundle context.
4. Iterate on calling SetPTransformState with each of the PTransform
contexts until the benchmark ends.
5. After the end of the benchmark loop, make sure the sampler goroutine is
stopped.
```
Note that thees benchmarks are not doing any of their own verification. This
is fine, that's the job of the tests, not the benchmarks.
While we are worried about lock contention, we want to most certainly avoid
allocations during Sampling and Setting the State as much as possible, so we'll
start there. We can use the -benchmem flag when running a benchmark to get this
information. It's our goal to get these to 0, if possible.
In the directory with the test, it'll look something like... `go test
-benchmem -run=^$ -bench='^BenchmarkMYCODE'`
To end up seeing lock contention and similar, we need to use pprof in
conjunction with the benchmarks.
See https://rakyll.org/mutexprofile/ for an example of doing that. For more
on the pprof tool see https://golang.org/doc/diagnostics#profiling and here's a
good intro to using the tool, in particular the web view:
https://jvns.ca/blog/2017/09/24/profiling-go-with-pprof/
Using Go Benchmark with pprof:
https://medium.com/@felipedutratine/profile-your-benchmark-with-pprof-fb7070ee1a94
##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -30,60 +30,24 @@ func NewSampler(ctx context.Context, store *Store)
StateSampler {
return StateSampler{store: store}
}
-func initialize() [4]*ExecutionState {
- var v [4]*ExecutionState
- for i := 0; i < 4; i++ {
- v[i] = &ExecutionState{}
- }
- return v
-}
-
func (s *StateSampler) Sample(ctx context.Context, t time.Duration) {
- ps := loadPTransformState(ctx)
- pid := PTransformLabels(ps.pid)
-
+ ps := loadCurrentState(ctx)
s.store.mu.Lock()
defer s.store.mu.Unlock()
- if _, ok := s.store.stateRegistry[pid]; !ok {
- s.store.stateRegistry[pid] = initialize()
- }
- if v, ok := s.store.stateRegistry[pid]; ok {
+ if v, ok := s.store.stateRegistry[ps.pid]; ok {
v[ps.state].TotalTime += t
v[TotalBundle].TotalTime += t
- }
- if _, ok := s.store.executionStore[pid]; !ok {
- s.store.executionStore[pid] = &executionTracker{}
- }
- if v, ok := s.store.executionStore[pid]; ok {
+ e := s.store.executionStore
- if v.transitionsAtLastSample != ps.transitions {
- // state change
- v.millisSinceLastTransition = 0
- v.numberOfTransitions = ps.transitions
- v.transitionsAtLastSample = ps.transitions
+ if e.transitionsAtLastSample != ps.transitions {
+ // state change detected
+ e.millisSinceLastTransition = 0
+ e.numberOfTransitions = ps.transitions
Review comment:
Since `numberOfTransitions` is always the same as
`transitionsAtLastSample` we should get rid of that field.
To check the expected transitions, get the number of transitions from
`loadCurrentState(ctx)` since that will be the accurate count, rather than "the
count at last sample" which is what this value is always providing.
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -174,36 +174,35 @@ type executionTracker struct {
numberOfTransitions int64
millisSinceLastTransition time.Duration
transitionsAtLastSample int64
-}
-
-// executionState is used to store as atomic.Value in Store.
-type pTransformState struct {
- pid string
- state bundleProcState
-}
-
-func newPTransformState(pid string, state bundleProcState) *pTransformState {
- return &pTransformState{pid: pid, state: state}
+ currentState bundleProcState
+ pid string
}
func SetPTransformState(ctx context.Context, state bundleProcState) {
if bctx, ok := ctx.(*beamCtx); ok {
- ps := newPTransformState(bctx.ptransformID, state)
- bctx.pStore.Store(ps)
- atomic.AddInt64(&bctx.transitions, 1)
+ pid := bctx.ptransformID
+ bctx.store.mu.Lock()
+ bctx.store.executionStore.pid = pid
+ bctx.store.executionStore.currentState = state
+ bctx.store.mu.Unlock()
+ atomic.AddInt64(&bctx.store.executionStore.numberOfTransitions,
1)
Review comment:
Do this after implementing the benchmarks.
Make the `store.executionStore` field an `atomic.Value` instead and write
the states to that instead. We don't want to use `store.mu` to lock these
interactions, since that would block the reads from the progress tracking
goroutine.
This means the numberOfTransitions will need to move to live on the store as
well. That allows the atomic to continue to be used.
Note, this means you should also remove the unused `pStore` field from the
`beamCtx` type.
It does mean we'll be allocating a new `type BundleState struct{ pid,
state}` for each call to this, but that's fine for now. We'll benchmark this on
this iteration, and if the allocations are a problem (and they will be), we'll
move to an approach where the 3 versions of the state are pre-allocated, so we
can quickly swap between them on each call. This is where some kind of
`PTransformState` will come in handy on top of the metrics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]