lostluck commented on a change in pull request #15657:
URL: https://github.com/apache/beam/pull/15657#discussion_r737870109



##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -136,6 +136,10 @@ func SetPTransformID(ctx context.Context, id string) 
context.Context {
        // Checking for *beamCtx is an optimization, so we don't dig deeply
        // for ids if not necessary.
        if bctx, ok := ctx.(*beamCtx); ok {
+               if _, ok := bctx.store.stateRegistry[id]; !ok {
+                       bctx.store.stateRegistry[id] = 
[4]*ExecutionState{&ExecutionState{}, &ExecutionState{}, &ExecutionState{}, 
&ExecutionState{}}

Review comment:
       Consider whether one can use `[4]ExecutionState{}` instead, which will 
automatically initialize the values.

##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -28,28 +29,37 @@ type StateSampler struct {
 }
 
 // NewSampler creates a new state sampler.
-func NewSampler(ctx context.Context, store *Store) StateSampler {
+func NewSampler(store *Store) StateSampler {
        return StateSampler{store: store}
 }
 
-func (s *StateSampler) Sample(ctx context.Context, t time.Duration) {
-       ps := loadCurrentState(ctx)
+func (s *StateSampler) Sample(t time.Duration) {

Review comment:
       The Sample function should have a context so it can use the beam logger 
for lull logging, when we hit >=5 minutes without transitions.

##########
File path: sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
##########
@@ -51,42 +53,65 @@ func groupByType(minfos []*pipepb.MonitoringInfo) (
                }
 
                r := bytes.NewReader(minfo.GetPayload())
-
-               switch minfo.GetType() {
-               case "beam:metrics:sum_int64:v1":
+               switch minfo.GetUrn() {
+               case "beam:metric:user:sum_int64:v1":
                        value, err := extractCounterValue(r)
                        if err != nil {
                                log.Println(err)
                                continue
                        }
                        counters[key] = value
-               case "beam:metrics:distribution_int64:v1":
+               case "beam:metric:user:distribution_int64:v1":
                        value, err := extractDistributionValue(r)
                        if err != nil {
                                log.Println(err)
                                continue
                        }
                        distributions[key] = value
                case
-                       "beam:metrics:latest_int64:v1",
-                       "beam:metrics:top_n_int64:v1",
-                       "beam:metrics:bottom_n_int64:v1":
+                       "beam:metric:user:latest_int64:v1",
+                       "beam:metric:user:top_n_int64:v1",
+                       "beam:metric:user:bottom_n_int64:v1":
                        value, err := extractGaugeValue(r)
                        if err != nil {
                                log.Println(err)
                                continue
                        }
                        gauges[key] = value
+               case
+                       
"beam:metric:pardo_execution_time:start_bundle_msecs:v1",

Review comment:
       Consider using `UrnToString(UrnStartBundle)` and similar to avoid 
duplicating the string constants where they are prone to typos. Within a 
package, the compiler is clever enough to inline things.

##########
File path: sdks/go/pkg/beam/core/metrics/sampler_test.go
##########
@@ -45,38 +45,38 @@ func TestSampler(t *testing.T) {
        bctx := SetBundleID(ctx, "test")
 
        st := GetStore(bctx)
-       s := NewSampler(bctx, st)
+       s := NewSampler(st)
 
        pctx := SetPTransformID(bctx, "transform")
        label := "transform"
 
        SetPTransformState(pctx, StartBundle)
-       s.Sample(bctx, 200*time.Millisecond)
+       s.Sample(200 * time.Millisecond)

Review comment:
       To simplify the test, just make a variable `interval := 200 * 
time.Millisecond` and re-use it instead of copying the multiplication 
everywhere. Do it per test, rather than a package level constant. That way 
readers of the test will know whether what's important about it or not.

##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -179,36 +180,29 @@ type BundleState struct {
 func SetPTransformState(ctx context.Context, state bundleProcState) {
        if bctx, ok := ctx.(*beamCtx); ok {
                pid := bctx.ptransformID
-               bctx.store.bundleState.Store(BundleState{pid: pid, 
currentState: state})
+               bctx.store.states[state].pid = pid

Review comment:
       The problem with this specifically is that it's a non-atomic write to 
something that will be read by some other thread, which means there's a data 
race. It's possible the Sampler will read the pid as it's being written to, 
which is bad.
   
   So, we need to have these pre-allocated BundleStates for each PTransform. 
What if we did those allocations once per PTransform? We can do so by defining 
a struct
   
   ```
   type PTransformState struct{
     states [3]BundleState
   }
   
   func NewPTransformState() *PTransformState {
     return &PTransformState{
       states: [3]BundleState{
         {pid, StartBundle},
         {pid, ProcessBundle},
         {pid, FinishBundle},
       },
     }
   }
   
   func (p *PTransformState) Set(ctx context.Context, s bundState) {
      /// ... existing implementation of SetPTransformState, but using the 
local s.states values instead ....
   }
   ```
   
   That should avoid the race condition, and mean in `exec.ParDo` we add a 
field  `state *metrics.PTransformState`
   which we initialize in the `Up` method, and call as we are currently doing.
   
   Finally, move all this new code for the state to sampler.go, so that 
everything that's related to sampling is in one place. *This does not mean 
moving things out of the store struct,just which file it's in. (except for the 
states field, which has just moved to the the PTransformState, and should not 
be directly in store.) *
   
   
   
   

##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -179,36 +180,29 @@ type BundleState struct {
 func SetPTransformState(ctx context.Context, state bundleProcState) {
        if bctx, ok := ctx.(*beamCtx); ok {
                pid := bctx.ptransformID
-               bctx.store.bundleState.Store(BundleState{pid: pid, 
currentState: state})
+               bctx.store.states[state].pid = pid
+               
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&bctx.store.bundleState)), 
unsafe.Pointer(&bctx.store.states[state]))
                atomic.AddInt64(&bctx.store.transitions, 1)
        }
 }
 
-// CurrentStateVal exports the current state of a bundle wrt PTransform.
-type CurrentStateVal struct {
+// currentStateVal exports the current state of a bundle wrt PTransform.
+type currentStateVal struct {
        pid         string
        state       bundleProcState
        transitions int64
 }
 
-func loadCurrentState(ctx context.Context) CurrentStateVal {
-       if bctx, ok := ctx.(*beamCtx); ok {
-               bs := bctx.store.bundleState.Load().(BundleState)
-               return CurrentStateVal{pid: bs.pid, state: bs.currentState, 
transitions: atomic.LoadInt64(&bctx.store.transitions)}
-       }
-       panic("execution store not yet set.")
-}
-
 // Store retains per transform countersets, intended for per bundle use.
 type Store struct {
        mu  sync.RWMutex
        css []*ptCounterSet
 
        store map[Labels]userMetric
 
-       transitions int64
-       bundleState atomic.Value
-
+       transitions   int64
+       bundleState   BundleState

Review comment:
       This should be a pointer to a BundleState, so it's only changing an 
address, not copying the string reference and an integer.

##########
File path: sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
##########
@@ -51,42 +53,65 @@ func groupByType(minfos []*pipepb.MonitoringInfo) (
                }
 
                r := bytes.NewReader(minfo.GetPayload())
-
-               switch minfo.GetType() {
-               case "beam:metrics:sum_int64:v1":
+               switch minfo.GetUrn() {
+               case "beam:metric:user:sum_int64:v1":
                        value, err := extractCounterValue(r)
                        if err != nil {
                                log.Println(err)
                                continue
                        }
                        counters[key] = value
-               case "beam:metrics:distribution_int64:v1":
+               case "beam:metric:user:distribution_int64:v1":
                        value, err := extractDistributionValue(r)
                        if err != nil {
                                log.Println(err)
                                continue
                        }
                        distributions[key] = value
                case
-                       "beam:metrics:latest_int64:v1",
-                       "beam:metrics:top_n_int64:v1",
-                       "beam:metrics:bottom_n_int64:v1":
+                       "beam:metric:user:latest_int64:v1",
+                       "beam:metric:user:top_n_int64:v1",
+                       "beam:metric:user:bottom_n_int64:v1":
                        value, err := extractGaugeValue(r)
                        if err != nil {
                                log.Println(err)
                                continue
                        }
                        gauges[key] = value
+               case
+                       
"beam:metric:pardo_execution_time:start_bundle_msecs:v1",
+                       
"beam:metric:pardo_execution_time:process_bundle_msecs:v1",
+                       
"beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
+                       "beam:metric:ptransform_execution_time:total_msecs:v1":
+                       value, err := extractMsecValue(r)
+                       if err != nil {
+                               log.Println(err)
+                               continue
+                       }
+                       msecs[key] = value
                default:
                        log.Println("unknown metric type")
                }
        }
-       return counters, distributions, gauges
+       return counters, distributions, gauges, msecs
 }
 
 func extractKey(mi *pipepb.MonitoringInfo) (metrics.StepKey, error) {
        labels := newLabels(mi.GetLabels())
        stepName := labels.Transform()
+       urn := mi.GetUrn()
+       switch urn {
+       case "beam:metric:pardo_execution_time:start_bundle_msecs:v1":

Review comment:
       Same comment here, WRT using UrnToString and our constants from 
metricsx/urns.go.

##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -91,6 +94,8 @@ type Extractor struct {
        DistributionInt64 func(labels Labels, count, sum, min, max int64)
        // GaugeInt64 extracts data from Gauge Int64 counters.
        GaugeInt64 func(labels Labels, v int64, t time.Time)
+       // MsecsInt64 extracts data from StateRegistry of ExecutionState

Review comment:
       Please add a comment here saying extracting Msec counters is 
Experimental and subject to change. That way we can come back and clean things 
up in a separate PR if needed.

##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -28,28 +29,37 @@ type StateSampler struct {
 }
 
 // NewSampler creates a new state sampler.
-func NewSampler(ctx context.Context, store *Store) StateSampler {
+func NewSampler(store *Store) StateSampler {
        return StateSampler{store: store}
 }
 
-func (s *StateSampler) Sample(ctx context.Context, t time.Duration) {
-       ps := loadCurrentState(ctx)
+func (s *StateSampler) Sample(t time.Duration) {
+       ps := loadCurrentState(s)
+       if ps.pid == "" {
+               return
+       }
        s.store.mu.Lock()
        defer s.store.mu.Unlock()
 
        if v, ok := s.store.stateRegistry[ps.pid]; ok {
                v[ps.state].TotalTime += t
                v[TotalBundle].TotalTime += t
 
-               e := s.store
-
                if s.transitionsAtLastSample != ps.transitions {
                        // state change detected
                        s.millisSinceLastTransition = 0
-                       e.transitions = ps.transitions
                        s.transitionsAtLastSample = ps.transitions
                } else {
                        s.millisSinceLastTransition += t
                }

Review comment:
       Add another `if` clause here to report lulls.
   
   We don't want to log every sample after the log interval, only once every 5 
minutes. So we can have two new fields. One variable that has the logging 
interval (5 minutes) and another that tracks what time "since" we want to log 
next.
   
   
   ```
   if s.millisSinceLastTransition > s.nextLogTime {
     log.Info(  ...standard long running operation log ... , pid, state, 
s.millis.SinceLastTransition)
     s.nextLogTime += s.logInterval
   }
   ```
   This means we need to reset the `s.nextLogTime` every time the samples 
reset, so we can log the next time a transition takes 5 minutes.
   
   Finally, since we are using time.Duration as the type for 
`s.millisSinceLastTransition` call it `s.timeSinceLastTransition` instead. It 
doesn't matter that we're later translating things to milliseconds in this one 
here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to