riteshghorse commented on a change in pull request #15657:
URL: https://github.com/apache/beam/pull/15657#discussion_r724685672



##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -145,16 +150,73 @@ type ptCounterSet struct {
        gauges        map[nameHash]*gauge
 }
 
+// Bundle processing state (START_BUNDLE, PROCESS_BUNDLE, FINISH_BUNDLE)
+type BundleProcState string
+
+const (
+       StartBundle   BundleProcState = "START_BUNDLE"
+       ProcessBundle BundleProcState = "PROCESS_BUNDLE"
+       FinishBundle  BundleProcState = "FINISH_BUNDLE"
+)
+
+// ExecutionState stores the information about a bundle in a particular state.
+type ExecutionState struct {
+       State           BundleProcState
+       IsProcessing    bool // set to true when sent as a response to 
ProcessBundleProgress Request
+       TotalTimeMillis int64
+}
+
+// ExecutionStateTracker stores information about a bundle for execution time 
metrics.
+type ExecutionStateTracker struct {
+       CurrentState              BundleProcState
+       State                     ExecutionState
+       NumberOfTransitions       int
+       MillisSinceLastTransition int
+       TransitionsAtLastSample   int
+       LastSampleTimeMillis      int
+}
+
 // Store retains per transform countersets, intended for per bundle use.
 type Store struct {
        mu  sync.RWMutex
        css []*ptCounterSet
 
        store map[Labels]userMetric
+
+       executionStore ExecutionStateTracker
+       stateRegistry  map[Labels][]ExecutionState
 }
 
 func newStore() *Store {
-       return &Store{store: make(map[Labels]userMetric)}
+       return &Store{store: make(map[Labels]userMetric), stateRegistry: 
make(map[Labels][]ExecutionState)}
+}
+
+// AddState adds an ExecutionState of a PTransform to the stateRegistry.
+func (s *Store) AddState(e ExecutionState, pid string) {
+       label := PTransformLabels(pid)
+       if s.stateRegistry == nil {
+               s.stateRegistry = make(map[Labels][]ExecutionState)
+       }
+       s.stateRegistry[label] = append(s.stateRegistry[label], e)
+}
+
+// SetState updates the state of a bundle.
+// For the bundle in start state, update the ExecutionState struct as well.
+func (s *Store) SetState(bs BundleProcState) {
+       if bs == StartBundle {
+               s.executionStore.State.State = bs
+       }
+       s.executionStore.CurrentState = bs
+}
+
+// IncTransitions increment the number of transitions by 1.
+func (s *Store) IncTransitions() {
+       s.executionStore.NumberOfTransitions += 1
+}
+
+// GetRegistry return the state registry.
+func (s *Store) GetRegistry() map[Labels][]ExecutionState {
+       return s.stateRegistry
 }

Review comment:
       I think we can have an abstraction like user metrics for msecs as well 
and these methods can be added on ExecutionStateTracker. However, I'm not clear 
on what does "initialize the sampler in each exec.ParDo in the Up method (which 
is what Up is for), and then pass the context into the methods for it that way 
instead." is implying.  
   
   As of now, when we move the actual sampler code to the metrics package, the 
harness package has the control to get a new sampler from the metrics package 
and call methods on it like `start()` and `stop()`. These methods call the 
methods on the sampler from metrics package.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to