riteshghorse commented on a change in pull request #15657:
URL: https://github.com/apache/beam/pull/15657#discussion_r724685672
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -145,16 +150,73 @@ type ptCounterSet struct {
gauges map[nameHash]*gauge
}
+// Bundle processing state (START_BUNDLE, PROCESS_BUNDLE, FINISH_BUNDLE)
+type BundleProcState string
+
+const (
+ StartBundle BundleProcState = "START_BUNDLE"
+ ProcessBundle BundleProcState = "PROCESS_BUNDLE"
+ FinishBundle BundleProcState = "FINISH_BUNDLE"
+)
+
+// ExecutionState stores the information about a bundle in a particular state.
+type ExecutionState struct {
+ State BundleProcState
+ IsProcessing bool // set to true when sent as a response to
ProcessBundleProgress Request
+ TotalTimeMillis int64
+}
+
+// ExecutionStateTracker stores information about a bundle for execution time
metrics.
+type ExecutionStateTracker struct {
+ CurrentState BundleProcState
+ State ExecutionState
+ NumberOfTransitions int
+ MillisSinceLastTransition int
+ TransitionsAtLastSample int
+ LastSampleTimeMillis int
+}
+
// Store retains per transform countersets, intended for per bundle use.
type Store struct {
mu sync.RWMutex
css []*ptCounterSet
store map[Labels]userMetric
+
+ executionStore ExecutionStateTracker
+ stateRegistry map[Labels][]ExecutionState
}
func newStore() *Store {
- return &Store{store: make(map[Labels]userMetric)}
+ return &Store{store: make(map[Labels]userMetric), stateRegistry:
make(map[Labels][]ExecutionState)}
+}
+
+// AddState adds an ExecutionState of a PTransform to the stateRegistry.
+func (s *Store) AddState(e ExecutionState, pid string) {
+ label := PTransformLabels(pid)
+ if s.stateRegistry == nil {
+ s.stateRegistry = make(map[Labels][]ExecutionState)
+ }
+ s.stateRegistry[label] = append(s.stateRegistry[label], e)
+}
+
+// SetState updates the state of a bundle.
+// For the bundle in start state, update the ExecutionState struct as well.
+func (s *Store) SetState(bs BundleProcState) {
+ if bs == StartBundle {
+ s.executionStore.State.State = bs
+ }
+ s.executionStore.CurrentState = bs
+}
+
+// IncTransitions increment the number of transitions by 1.
+func (s *Store) IncTransitions() {
+ s.executionStore.NumberOfTransitions += 1
+}
+
+// GetRegistry return the state registry.
+func (s *Store) GetRegistry() map[Labels][]ExecutionState {
+ return s.stateRegistry
}
Review comment:
I think we can have an abstraction like user metrics for msecs as well
and these methods can be added on ExecutionStateTracker. However, I'm not clear
on what does "initialize the sampler in each exec.ParDo in the Up method (which
is what Up is for), and then pass the context into the methods for it that way
instead." is implying.
As of now, when we move the actual sampler code to the metrics package, the
harness package has the control to get a new sampler from the metrics package
and call methods on it like `start()` and `stop()`. These methods call the
methods on the sampler from metrics package.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]