riteshghorse commented on a change in pull request #15657:
URL: https://github.com/apache/beam/pull/15657#discussion_r730078534



##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -316,13 +316,20 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                if err != nil {
                        return fail(ctx, instID, "Failed: %v", err)
                }
+               // Start the sampler goroutine here
+               sampler := newSampler(ctx, store)
+               sampler.start(ctx, time.Millisecond*200)

Review comment:
       Got it. Missed this. Thanks!

##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -17,58 +17,73 @@ package metrics
 
 import (
        "context"
-       "sync/atomic"
        "time"
 )
 
 // StateSampler tracks the state of a bundle.
 type StateSampler struct {
-       done  chan (int) // signal to stop sampling
-       e     *ExecutionTracker
-       store *Store
-       total int64
+       store *Store // used to store states into state registry
 }
 
 // NewSampler creates a new state sampler.
 func NewSampler(ctx context.Context, store *Store) StateSampler {
-       return StateSampler{done: make(chan int), e: getExecutionStore(ctx), 
store: store}
+       return StateSampler{store: store}
 }
 
-// Start is called from the harness package repeatedly whenever required
-func (s *StateSampler) Start(ctx context.Context, t time.Duration) {
-       s.startSampler(ctx, t)
+func initialize() [4]*ExecutionState {
+       var v [4]*ExecutionState
+       for i := 0; i < 4; i++ {
+               v[i] = &ExecutionState{}
+       }
+       return v
 }
 
-func (s *StateSampler) startSampler(ctx context.Context, t time.Duration) {
-       if loadTransitions(ctx) != 
atomic.LoadInt64(&(s.e.TransitionsAtLastSample)) {
-               s.sample(ctx)
+func (s *StateSampler) Sample(ctx context.Context, t time.Duration) {
+       ps := loadPTransformState(ctx)
+       pid := PTransformLabels(ps.pid)
+
+       s.store.mu.Lock()
+       defer s.store.mu.Unlock()
+
+       if _, ok := s.store.stateRegistry[pid]; !ok {
+               s.store.stateRegistry[pid] = initialize()
+       }
+       if v, ok := s.store.stateRegistry[pid]; ok {
+               v[ps.state].TotalTime += t
+               v[TotalBundle].TotalTime += t
        }
 
-       atomic.AddInt64(&s.e.MillisSinceLastTransition, int64(t))
-       atomic.AddInt64(&s.e.State.TotalTimeMillis, int64(t))
-}
+       if _, ok := s.store.executionStore[pid]; !ok {
+               s.store.executionStore[pid] = &executionTracker{}
+       }
+       if v, ok := s.store.executionStore[pid]; ok {
 
-func (s *StateSampler) Stop(ctx context.Context, t time.Duration) {
-       s.stopSampler(ctx, t)
+               if v.transitionsAtLastSample != ps.transitions {
+                       // state change
+                       v.millisSinceLastTransition = 0
+                       v.numberOfTransitions = ps.transitions
+                       v.transitionsAtLastSample = ps.transitions
+               } else {
+                       v.millisSinceLastTransition += t
+               }
+       }
 }
 
-func (s *StateSampler) stopSampler(ctx context.Context, t time.Duration) {
-       // collect the remaining metrics (finish bundle metrics)
-       s.sample(ctx)
-       // add final state
-
-       ex := ExecutionState{State: TotalBundle, TotalTimeMillis: s.total}
-       s.store.AddState(ex, loadPTransformState(ctx).pid)
+// Start is called from the harness package repeatedly whenever required
+func (s *StateSampler) Start(ctx context.Context, t time.Duration) {
+       s.Sample(ctx, t)
 }
 
-func (s *StateSampler) sample(ctx context.Context) {
+func (s *StateSampler) Stop(ctx context.Context) {

Review comment:
       Okay. This would mean that in some cases when the PTransform transitions 
to FinishBundle and immediately stops, we won't have record for that. But as we 
are aiming for approximate time, this won't make much of a difference.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to