lostluck commented on a change in pull request #15657:
URL: https://github.com/apache/beam/pull/15657#discussion_r731158201



##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -189,6 +197,8 @@ func (t kind) String() string {
                return "Distribution"
        case kindGauge:
                return "Gauge"
+       case kindMsec:
+               return "kindMsec"

Review comment:
       ```suggestion
                return "DoFnMsec"
   ```

##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -145,16 +152,83 @@ type ptCounterSet struct {
        gauges        map[nameHash]*gauge
 }
 
+// Bundle processing state (START_BUNDLE, PROCESS_BUNDLE, FINISH_BUNDLE)
+type bundleProcState int
+
+const (
+       StartBundle   bundleProcState = 0
+       ProcessBundle bundleProcState = 1
+       FinishBundle  bundleProcState = 2
+       TotalBundle   bundleProcState = 3
+)
+
+// ExecutionState stores the information about a bundle in a particular state.
+type ExecutionState struct {
+       State        bundleProcState
+       IsProcessing bool // set to true when sent as a response to 
ProcessBundleProgress Request
+       TotalTime    time.Duration
+}
+
+// executionTracker stores information about a PTransform for execution time 
metrics.
+type executionTracker struct {
+       numberOfTransitions       int64
+       millisSinceLastTransition time.Duration
+       transitionsAtLastSample   int64
+       currentState              bundleProcState
+       pid                       string
+}
+
+func SetPTransformState(ctx context.Context, state bundleProcState) {
+       if bctx, ok := ctx.(*beamCtx); ok {
+               pid := bctx.ptransformID
+               bctx.store.mu.Lock()
+               bctx.store.executionStore.pid = pid
+               bctx.store.executionStore.currentState = state
+               bctx.store.mu.Unlock()
+               atomic.AddInt64(&bctx.store.executionStore.numberOfTransitions, 
1)
+       }
+}
+
+type CurrentStateVal struct {
+       pid         string
+       state       bundleProcState
+       transitions int64
+}
+
+func loadCurrentState(ctx context.Context) CurrentStateVal {
+       if bctx, ok := ctx.(*beamCtx); ok {
+               bctx.store.mu.Lock()
+               pid := bctx.store.executionStore.pid
+
+               state := bctx.store.executionStore.currentState
+               bctx.store.mu.Unlock()
+               return CurrentStateVal{pid: pid, state: state, transitions: 
atomic.LoadInt64(&bctx.store.executionStore.numberOfTransitions)}
+       }
+       panic("execution store not yet set.")
+}
+
 // Store retains per transform countersets, intended for per bundle use.
 type Store struct {
        mu  sync.RWMutex
        css []*ptCounterSet
 
-       store map[Labels]userMetric
+       store          map[Labels]userMetric
+       executionStore *executionTracker
+
+       stateRegistry map[string][4]*ExecutionState
 }
 
 func newStore() *Store {
-       return &Store{store: make(map[Labels]userMetric)}
+       return &Store{store: make(map[Labels]userMetric), executionStore: 
&executionTracker{}, stateRegistry: make(map[string][4]*ExecutionState)}
+}
+
+// GetRegistry return the state registry.
+func (s *Store) GetRegistry() map[string][4]*ExecutionState {
+       return s.stateRegistry
+}
+
+func (s *Store) GetExecutionStore() *executionTracker {

Review comment:
       We never call this, please remove it.

##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -30,60 +30,24 @@ func NewSampler(ctx context.Context, store *Store) 
StateSampler {
        return StateSampler{store: store}
 }
 
-func initialize() [4]*ExecutionState {
-       var v [4]*ExecutionState
-       for i := 0; i < 4; i++ {
-               v[i] = &ExecutionState{}
-       }
-       return v
-}
-
 func (s *StateSampler) Sample(ctx context.Context, t time.Duration) {
-       ps := loadPTransformState(ctx)
-       pid := PTransformLabels(ps.pid)
-
+       ps := loadCurrentState(ctx)
        s.store.mu.Lock()
        defer s.store.mu.Unlock()

Review comment:
       Note that we're OK with using the lock here, because the StateSampler 
will acquire the lock relatively infrequenly, once every ~200ms, and not hold 
it for very long at all. The SetPTransformState call will can be called 
millions of times in that interval, making it a very contentious lock, and why 
we must avoid grabbing the lock in that call.

##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -214,20 +213,21 @@ type Store struct {
        css []*ptCounterSet
 
        store          map[Labels]userMetric
-       executionStore map[Labels]*executionTracker
-       stateRegistry  map[Labels][4]*ExecutionState
+       executionStore *executionTracker
+
+       stateRegistry map[string][4]*ExecutionState
 }
 
 func newStore() *Store {
-       return &Store{store: make(map[Labels]userMetric), executionStore: 
make(map[Labels]*executionTracker), stateRegistry: 
make(map[Labels][4]*ExecutionState)}
+       return &Store{store: make(map[Labels]userMetric), executionStore: 
&executionTracker{}, stateRegistry: make(map[string][4]*ExecutionState)}
 }
 
 // GetRegistry return the state registry.
-func (s *Store) GetRegistry() map[Labels][4]*ExecutionState {
+func (s *Store) GetRegistry() map[string][4]*ExecutionState {

Review comment:
       Since this exported method is only used in the Test, can we get rid of 
it?

##########
File path: sdks/go/pkg/beam/core/metrics/sampler_test.go
##########
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+       "context"
+       "testing"
+       "time"
+)
+
+func TestSampler(t *testing.T) {

Review comment:
       Time to add a benchmark or three! You can read a little bit about 
writing micro-benchmarks in Go, in the testing package doc: 
https://pkg.go.dev/testing#hdr-Benchmarks  They're basically special kinds of 
test, that start with `func Benchmark` instead of `func Test`.  A longer 
description is over here: 
https://dev.to/mcaci/introduction-to-benchmarks-in-go-3cii
   
   We should have 3 benchmarks for now. One just for the SetPTransformState 
operation, one for the Sampler's Sample operation, and one for using them both 
at the same time (yes, you'll finally get to use `go` in a test.).
   
   For those first two:
   ```
   1. Initialize Bundle and PTransform contexts.
   2. Repeatedly call the function under test until the benchmark exits.
   ```
   
   For the combo benchmark:
   ```
   1. Initialize the bundle contexts.
   2. Start a sampler goroutine with the bundle context (very similar to how it 
works in the harness package). 
     - This goroutine loop and sleep, but every 5ms, instead of 200ms.
   3. Initialize 2 different PTransform contexts from the bundle context.
   4. Iterate on calling SetPTransformState with each of the PTransform 
contexts until the benchmark ends.
   5. After the end of the benchmark loop, make sure the sampler goroutine is 
stopped.
   ```
   Note that thees benchmarks are not doing any of their own verification. This 
is fine, that's the job of the tests, not the benchmarks.
   
   While we are worried about lock contention, we want to most certainly avoid 
allocations during Sampling and Setting the State as much as possible, so we'll 
start there. We can use the -benchmem flag when running a benchmark to get this 
information. It's our goal to get these to 0, if possible.
   
   In the directory with the test, it'll look something like... `go test 
-benchmem -run=^$ -bench='^BenchmarkMYCODE'`
   
   To end up seeing lock contention and similar, we need to use pprof in 
conjunction with the benchmarks. 
   
   See https://rakyll.org/mutexprofile/ for an example of doing that.  For more 
on the pprof tool see https://golang.org/doc/diagnostics#profiling and here's a 
good intro to using the tool, in particular the web view: 
https://jvns.ca/blog/2017/09/24/profiling-go-with-pprof/
   
   Using Go Benchmark with pprof: 
https://medium.com/@felipedutratine/profile-your-benchmark-with-pprof-fb7070ee1a94

##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -30,60 +30,24 @@ func NewSampler(ctx context.Context, store *Store) 
StateSampler {
        return StateSampler{store: store}
 }
 
-func initialize() [4]*ExecutionState {
-       var v [4]*ExecutionState
-       for i := 0; i < 4; i++ {
-               v[i] = &ExecutionState{}
-       }
-       return v
-}
-
 func (s *StateSampler) Sample(ctx context.Context, t time.Duration) {
-       ps := loadPTransformState(ctx)
-       pid := PTransformLabels(ps.pid)
-
+       ps := loadCurrentState(ctx)
        s.store.mu.Lock()
        defer s.store.mu.Unlock()
 
-       if _, ok := s.store.stateRegistry[pid]; !ok {
-               s.store.stateRegistry[pid] = initialize()
-       }
-       if v, ok := s.store.stateRegistry[pid]; ok {
+       if v, ok := s.store.stateRegistry[ps.pid]; ok {
                v[ps.state].TotalTime += t
                v[TotalBundle].TotalTime += t
-       }
 
-       if _, ok := s.store.executionStore[pid]; !ok {
-               s.store.executionStore[pid] = &executionTracker{}
-       }
-       if v, ok := s.store.executionStore[pid]; ok {
+               e := s.store.executionStore
 
-               if v.transitionsAtLastSample != ps.transitions {
-                       // state change
-                       v.millisSinceLastTransition = 0
-                       v.numberOfTransitions = ps.transitions
-                       v.transitionsAtLastSample = ps.transitions
+               if e.transitionsAtLastSample != ps.transitions {
+                       // state change detected
+                       e.millisSinceLastTransition = 0
+                       e.numberOfTransitions = ps.transitions

Review comment:
       Since `numberOfTransitions` is always the same as 
`transitionsAtLastSample` we should get rid of that field.
   
   To check the expected transitions, get the number of transitions from 
`loadCurrentState(ctx)` since that will be the accurate count, rather than "the 
count at last sample" which is what this value is always providing.

##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -174,36 +174,35 @@ type executionTracker struct {
        numberOfTransitions       int64
        millisSinceLastTransition time.Duration
        transitionsAtLastSample   int64
-}
-
-// executionState is used to store as atomic.Value in Store.
-type pTransformState struct {
-       pid   string
-       state bundleProcState
-}
-
-func newPTransformState(pid string, state bundleProcState) *pTransformState {
-       return &pTransformState{pid: pid, state: state}
+       currentState              bundleProcState
+       pid                       string
 }
 
 func SetPTransformState(ctx context.Context, state bundleProcState) {
        if bctx, ok := ctx.(*beamCtx); ok {
-               ps := newPTransformState(bctx.ptransformID, state)
-               bctx.pStore.Store(ps)
-               atomic.AddInt64(&bctx.transitions, 1)
+               pid := bctx.ptransformID
+               bctx.store.mu.Lock()
+               bctx.store.executionStore.pid = pid
+               bctx.store.executionStore.currentState = state
+               bctx.store.mu.Unlock()
+               atomic.AddInt64(&bctx.store.executionStore.numberOfTransitions, 
1)

Review comment:
       Do this after implementing the benchmarks.
   
   Make the `store.executionStore` field an `atomic.Value` instead and write 
the states to that instead. We don't want to use `store.mu` to lock these 
interactions, since that would block the reads from the progress tracking 
goroutine. 
   
   This means the numberOfTransitions will need to move to live on the store as 
well. That allows the atomic to continue to be used. 
   
   Note, this means you should also remove the unused `pStore` field from the 
`beamCtx` type.
   
   It does mean we'll be allocating a new `type BundleState struct{ pid, 
state}` for each call to this, but that's fine for now. We'll benchmark this on 
this iteration, and if the allocations are a problem (and they will be), we'll 
move to an approach where the 3 versions of the state are pre-allocated, so we 
can quickly swap between them on each call. This is where some kind of 
`PTransformState` will come in handy on top of the metrics.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to