lostluck commented on a change in pull request #15657:
URL: https://github.com/apache/beam/pull/15657#discussion_r722462734
##########
File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go
##########
@@ -121,6 +124,10 @@ func (n *ParDo) ProcessElement(_ context.Context, elm
*FullValue, values ...ReSt
return errors.Errorf("invalid status for pardo %v: %v, want
Active", n.UID, n.status)
}
+ // set current state for execution time metrics
+ metrics.GetStore(n.ctx).SetState(metrics.ProcessBundle)
+ metrics.GetStore(n.ctx).IncTransitions()
Review comment:
GetStore isn't very lightweight (extracting things from contexts never
is), so get the Store once, and then call SetState and IncTransitions on it,
rather than getting it twice.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -273,6 +273,54 @@ func (c *control) getOrCreatePlan(bdID bundleDescriptorID)
(*exec.Plan, error) {
return plan, nil
}
+// Sampler provides methods for implementing state sampling
+// to track execution time metrics.
+type Sampler interface {
+ startSampler()
+ stopSampler()
+ sample()
+}
+
+// StateSampler tracks the state of a bundle.
+type StateSampler struct {
+ done chan (int) // signal to stop sampling
+ e *metrics.ExecutionStateTracker
+ store *metrics.Store
+ pid string // PTransform ID
+}
+
+func newSampler(e *metrics.ExecutionStateTracker, store *metrics.Store, pid
string) StateSampler {
+ return StateSampler{done: make(chan int), e: e, store: store, pid: pid}
+}
+
+func (s StateSampler) startSampler() {
+ for {
+ select {
+ case <-s.done:
+ return
+ default:
+ if s.e.NumberOfTransitions !=
s.e.TransitionsAtLastSample {
+ s.sample()
+ }
+ s.e.MillisSinceLastTransition += 200
+ s.e.State.TotalTimeMillis += 200
+ time.Sleep(200 * time.Millisecond)
+ }
+ }
+}
+
+func (s StateSampler) stopSampler() {
+ s.done <- 1
+}
+
+func (s StateSampler) sample() {
+ s.e.TransitionsAtLastSample = s.e.NumberOfTransitions
+ s.e.MillisSinceLastTransition = 0
+ s.store.AddState(s.e.State, s.pid)
+ s.e.State.State = s.e.CurrentState
+ s.e.State.TotalTimeMillis = 0
+}
Review comment:
If we make the default case in the sampler loop thread safe then this
method can be not-thread safe (if it's always called in a the lock's critical
section (aka when the lock is held).
##########
File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go
##########
@@ -121,6 +124,10 @@ func (n *ParDo) ProcessElement(_ context.Context, elm
*FullValue, values ...ReSt
return errors.Errorf("invalid status for pardo %v: %v, want
Active", n.UID, n.status)
}
+ // set current state for execution time metrics
+ metrics.GetStore(n.ctx).SetState(metrics.ProcessBundle)
+ metrics.GetStore(n.ctx).IncTransitions()
Review comment:
Note this is sort of obsoleted by another comment in how to change the
approach.
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -145,16 +150,73 @@ type ptCounterSet struct {
gauges map[nameHash]*gauge
}
+// Bundle processing state (START_BUNDLE, PROCESS_BUNDLE, FINISH_BUNDLE)
+type BundleProcState string
+
+const (
+ StartBundle BundleProcState = "START_BUNDLE"
+ ProcessBundle BundleProcState = "PROCESS_BUNDLE"
+ FinishBundle BundleProcState = "FINISH_BUNDLE"
+)
Review comment:
Two comments here:
There's no need to export the BundleProcState type (instead make it
bundleProcState). This prevents new states from being defined out side of the
package. Users can always use the ones defined here in the constants, because
the constants themselves are exported.
2nd. We should not be using strings as a base type. String Comparisons are
expensive since they do need to go character by character. Use an int instead
which will be much faster to compare. You can always add a `String() string`
method to the type that translates 0, 1, 2 to the appropriate Strings.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -273,6 +273,54 @@ func (c *control) getOrCreatePlan(bdID bundleDescriptorID)
(*exec.Plan, error) {
return plan, nil
}
+// Sampler provides methods for implementing state sampling
+// to track execution time metrics.
+type Sampler interface {
+ startSampler()
+ stopSampler()
+ sample()
+}
+
+// StateSampler tracks the state of a bundle.
+type StateSampler struct {
+ done chan (int) // signal to stop sampling
+ e *metrics.ExecutionStateTracker
+ store *metrics.Store
+ pid string // PTransform ID
+}
+
+func newSampler(e *metrics.ExecutionStateTracker, store *metrics.Store, pid
string) StateSampler {
+ return StateSampler{done: make(chan int), e: e, store: store, pid: pid}
+}
+
+func (s StateSampler) startSampler() {
+ for {
+ select {
+ case <-s.done:
+ return
+ default:
+ if s.e.NumberOfTransitions !=
s.e.TransitionsAtLastSample {
+ s.sample()
+ }
+ s.e.MillisSinceLastTransition += 200
+ s.e.State.TotalTimeMillis += 200
Review comment:
This set of operations is not threadsafe.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -273,6 +273,54 @@ func (c *control) getOrCreatePlan(bdID bundleDescriptorID)
(*exec.Plan, error) {
return plan, nil
}
+// Sampler provides methods for implementing state sampling
+// to track execution time metrics.
+type Sampler interface {
+ startSampler()
+ stopSampler()
+ sample()
+}
+
+// StateSampler tracks the state of a bundle.
+type StateSampler struct {
+ done chan (int) // signal to stop sampling
+ e *metrics.ExecutionStateTracker
+ store *metrics.Store
+ pid string // PTransform ID
+}
+
+func newSampler(e *metrics.ExecutionStateTracker, store *metrics.Store, pid
string) StateSampler {
+ return StateSampler{done: make(chan int), e: e, store: store, pid: pid}
+}
+
+func (s StateSampler) startSampler() {
+ for {
+ select {
+ case <-s.done:
+ return
+ default:
+ if s.e.NumberOfTransitions !=
s.e.TransitionsAtLastSample {
+ s.sample()
+ }
+ s.e.MillisSinceLastTransition += 200
+ s.e.State.TotalTimeMillis += 200
+ time.Sleep(200 * time.Millisecond)
Review comment:
The harness package should be what handles the times/sleeps however. We
can have the tick/increment method on the executions state tracker take in the
number of milliseconds that have passed. That will make it easier to test by
itself (avoiding real time in tests).
To avoid lock contention or similar, this implies we might need the tick
method, and a "sampleAndTick" method to avoid too much lock churn.
##########
File path: sdks/go/pkg/beam/core/runtime/exec/pardo.go
##########
@@ -198,6 +205,10 @@ func (n *ParDo) FinishBundle(_ context.Context) error {
n.status = Up
n.inv.Reset()
+ // set current state for execution time metrics
+ metrics.GetStore(n.ctx).SetState(metrics.FinishBundle)
+ metrics.GetStore(n.ctx).IncTransitions()
Review comment:
Same here.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -273,6 +273,54 @@ func (c *control) getOrCreatePlan(bdID bundleDescriptorID)
(*exec.Plan, error) {
return plan, nil
}
+// Sampler provides methods for implementing state sampling
+// to track execution time metrics.
+type Sampler interface {
+ startSampler()
+ stopSampler()
+ sample()
+}
Review comment:
I don't think this interface is doing anything for us? Can we remove it?
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -145,16 +150,73 @@ type ptCounterSet struct {
gauges map[nameHash]*gauge
}
+// Bundle processing state (START_BUNDLE, PROCESS_BUNDLE, FINISH_BUNDLE)
+type BundleProcState string
+
+const (
+ StartBundle BundleProcState = "START_BUNDLE"
+ ProcessBundle BundleProcState = "PROCESS_BUNDLE"
+ FinishBundle BundleProcState = "FINISH_BUNDLE"
+)
+
+// ExecutionState stores the information about a bundle in a particular state.
+type ExecutionState struct {
+ State BundleProcState
+ IsProcessing bool // set to true when sent as a response to
ProcessBundleProgress Request
+ TotalTimeMillis int64
+}
+
+// ExecutionStateTracker stores information about a bundle for execution time
metrics.
+type ExecutionStateTracker struct {
+ CurrentState BundleProcState
+ State ExecutionState
+ NumberOfTransitions int
+ MillisSinceLastTransition int
+ TransitionsAtLastSample int
+ LastSampleTimeMillis int
+}
+
// Store retains per transform countersets, intended for per bundle use.
type Store struct {
mu sync.RWMutex
css []*ptCounterSet
store map[Labels]userMetric
+
+ executionStore ExecutionStateTracker
+ stateRegistry map[Labels][]ExecutionState
}
func newStore() *Store {
- return &Store{store: make(map[Labels]userMetric)}
+ return &Store{store: make(map[Labels]userMetric), stateRegistry:
make(map[Labels][]ExecutionState)}
+}
+
+// AddState adds an ExecutionState of a PTransform to the stateRegistry.
+func (s *Store) AddState(e ExecutionState, pid string) {
+ label := PTransformLabels(pid)
+ if s.stateRegistry == nil {
+ s.stateRegistry = make(map[Labels][]ExecutionState)
+ }
+ s.stateRegistry[label] = append(s.stateRegistry[label], e)
+}
+
+// SetState updates the state of a bundle.
+// For the bundle in start state, update the ExecutionState struct as well.
+func (s *Store) SetState(bs BundleProcState) {
+ if bs == StartBundle {
+ s.executionStore.State.State = bs
+ }
+ s.executionStore.CurrentState = bs
+}
+
+// IncTransitions increment the number of transitions by 1.
+func (s *Store) IncTransitions() {
+ s.executionStore.NumberOfTransitions += 1
+}
+
+// GetRegistry return the state registry.
+func (s *Store) GetRegistry() map[Labels][]ExecutionState {
+ return s.stateRegistry
}
Review comment:
If you look at the other methods for the Store, (I guess that's
`storeMetric`) note that it acquires the lock. None of these are concurrency
safe operations between reading and writing threads. We need to take that into
account.
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -145,16 +150,73 @@ type ptCounterSet struct {
gauges map[nameHash]*gauge
}
+// Bundle processing state (START_BUNDLE, PROCESS_BUNDLE, FINISH_BUNDLE)
+type BundleProcState string
+
+const (
+ StartBundle BundleProcState = "START_BUNDLE"
+ ProcessBundle BundleProcState = "PROCESS_BUNDLE"
+ FinishBundle BundleProcState = "FINISH_BUNDLE"
+)
+
+// ExecutionState stores the information about a bundle in a particular state.
+type ExecutionState struct {
+ State BundleProcState
+ IsProcessing bool // set to true when sent as a response to
ProcessBundleProgress Request
+ TotalTimeMillis int64
+}
+
+// ExecutionStateTracker stores information about a bundle for execution time
metrics.
+type ExecutionStateTracker struct {
Review comment:
Name-wise, we can probably shorten this to `ExecutionTracker` , "State"
doesn't add much to it.
##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -155,6 +155,14 @@ func GetStore(ctx context.Context) *Store {
return nil
}
+// GetExecutionStore extracts the metrics ExecutionStateTracker for the
+// given context of a bundle.
+// Ensure that the store for the bundle exist before calling this function.
+func GetExecutionStore(ctx context.Context) *ExecutionStateTracker {
+ store := GetStore(ctx)
+ return &store.executionStore
+}
Review comment:
Let's try to avoid exposing internal state like this, as it complicates
the concurrency handling. This implies we might want to move more of the state
sampler handling into the metrics package, with lighter configuration and
actuation in the harness. Afterall, the harness is largely signalling "Hey,
contribute whatever the current difference is to your current state and start
counting again".
##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -273,6 +273,54 @@ func (c *control) getOrCreatePlan(bdID bundleDescriptorID)
(*exec.Plan, error) {
return plan, nil
}
+// Sampler provides methods for implementing state sampling
+// to track execution time metrics.
+type Sampler interface {
+ startSampler()
+ stopSampler()
+ sample()
+}
+
+// StateSampler tracks the state of a bundle.
+type StateSampler struct {
+ done chan (int) // signal to stop sampling
+ e *metrics.ExecutionStateTracker
+ store *metrics.Store
+ pid string // PTransform ID
+}
+
+func newSampler(e *metrics.ExecutionStateTracker, store *metrics.Store, pid
string) StateSampler {
+ return StateSampler{done: make(chan int), e: e, store: store, pid: pid}
+}
+
+func (s StateSampler) startSampler() {
Review comment:
Prefer to use pointer methods instead of value methods for the state
sampler. Every method call will be copying the state of the various fields
which will take time, which the pointer will not.
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -145,16 +150,73 @@ type ptCounterSet struct {
gauges map[nameHash]*gauge
}
+// Bundle processing state (START_BUNDLE, PROCESS_BUNDLE, FINISH_BUNDLE)
+type BundleProcState string
+
+const (
+ StartBundle BundleProcState = "START_BUNDLE"
+ ProcessBundle BundleProcState = "PROCESS_BUNDLE"
+ FinishBundle BundleProcState = "FINISH_BUNDLE"
+)
+
+// ExecutionState stores the information about a bundle in a particular state.
+type ExecutionState struct {
+ State BundleProcState
+ IsProcessing bool // set to true when sent as a response to
ProcessBundleProgress Request
+ TotalTimeMillis int64
+}
+
+// ExecutionStateTracker stores information about a bundle for execution time
metrics.
+type ExecutionStateTracker struct {
+ CurrentState BundleProcState
+ State ExecutionState
+ NumberOfTransitions int
+ MillisSinceLastTransition int
+ TransitionsAtLastSample int
+ LastSampleTimeMillis int
+}
+
// Store retains per transform countersets, intended for per bundle use.
type Store struct {
mu sync.RWMutex
css []*ptCounterSet
store map[Labels]userMetric
+
+ executionStore ExecutionStateTracker
+ stateRegistry map[Labels][]ExecutionState
Review comment:
Changing the "states" to be int's also lets us change these slices to
fixed arrays: `[3]ExecutionState`, since we know there's always those 3 states.
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -145,16 +150,73 @@ type ptCounterSet struct {
gauges map[nameHash]*gauge
}
+// Bundle processing state (START_BUNDLE, PROCESS_BUNDLE, FINISH_BUNDLE)
+type BundleProcState string
+
+const (
+ StartBundle BundleProcState = "START_BUNDLE"
+ ProcessBundle BundleProcState = "PROCESS_BUNDLE"
+ FinishBundle BundleProcState = "FINISH_BUNDLE"
+)
+
+// ExecutionState stores the information about a bundle in a particular state.
+type ExecutionState struct {
+ State BundleProcState
+ IsProcessing bool // set to true when sent as a response to
ProcessBundleProgress Request
+ TotalTimeMillis int64
+}
+
+// ExecutionStateTracker stores information about a bundle for execution time
metrics.
+type ExecutionStateTracker struct {
+ CurrentState BundleProcState
+ State ExecutionState
+ NumberOfTransitions int
+ MillisSinceLastTransition int
+ TransitionsAtLastSample int
+ LastSampleTimeMillis int
+}
+
// Store retains per transform countersets, intended for per bundle use.
type Store struct {
mu sync.RWMutex
css []*ptCounterSet
store map[Labels]userMetric
+
+ executionStore ExecutionStateTracker
+ stateRegistry map[Labels][]ExecutionState
}
func newStore() *Store {
- return &Store{store: make(map[Labels]userMetric)}
+ return &Store{store: make(map[Labels]userMetric), stateRegistry:
make(map[Labels][]ExecutionState)}
+}
+
+// AddState adds an ExecutionState of a PTransform to the stateRegistry.
+func (s *Store) AddState(e ExecutionState, pid string) {
+ label := PTransformLabels(pid)
+ if s.stateRegistry == nil {
+ s.stateRegistry = make(map[Labels][]ExecutionState)
+ }
+ s.stateRegistry[label] = append(s.stateRegistry[label], e)
+}
+
+// SetState updates the state of a bundle.
+// For the bundle in start state, update the ExecutionState struct as well.
+func (s *Store) SetState(bs BundleProcState) {
+ if bs == StartBundle {
+ s.executionStore.State.State = bs
+ }
+ s.executionStore.CurrentState = bs
+}
+
+// IncTransitions increment the number of transitions by 1.
+func (s *Store) IncTransitions() {
+ s.executionStore.NumberOfTransitions += 1
+}
+
+// GetRegistry return the state registry.
+func (s *Store) GetRegistry() map[Labels][]ExecutionState {
+ return s.stateRegistry
}
Review comment:
Instead of putting these as methods on store, instead lets have them as
methods on a "ExecutionStateTracker" object in the metrics package (similar,
but not identical) to how the existing user metrics operate. We initialize the
sampler in each `exec.ParDo` in the `Up` method (which is what `Up` is for),
and then pass the context into the methods for it that way instead.
That way all the concurrency handling is isolated into the metrics package
which, as you've observed already has similar mechanisms at work.
It doesn't need to be 100% identical to the existing metrics because we know
we're creating one of these per Bundle anyway (which is not always true for
user metrics). The main difference is we don't need the separated "counterSet"
like we have for the user counters, since that's to deal with runtime declared
counters that are constantly created per call.
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -145,16 +150,73 @@ type ptCounterSet struct {
gauges map[nameHash]*gauge
}
+// Bundle processing state (START_BUNDLE, PROCESS_BUNDLE, FINISH_BUNDLE)
+type BundleProcState string
+
+const (
+ StartBundle BundleProcState = "START_BUNDLE"
+ ProcessBundle BundleProcState = "PROCESS_BUNDLE"
+ FinishBundle BundleProcState = "FINISH_BUNDLE"
Review comment:
While not technically a state, a "TotalBundle" would be a good way to
identically track/handle the total metrics.
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -145,16 +150,73 @@ type ptCounterSet struct {
gauges map[nameHash]*gauge
}
+// Bundle processing state (START_BUNDLE, PROCESS_BUNDLE, FINISH_BUNDLE)
+type BundleProcState string
+
+const (
+ StartBundle BundleProcState = "START_BUNDLE"
+ ProcessBundle BundleProcState = "PROCESS_BUNDLE"
+ FinishBundle BundleProcState = "FINISH_BUNDLE"
+)
+
+// ExecutionState stores the information about a bundle in a particular state.
+type ExecutionState struct {
+ State BundleProcState
+ IsProcessing bool // set to true when sent as a response to
ProcessBundleProgress Request
+ TotalTimeMillis int64
+}
+
+// ExecutionStateTracker stores information about a bundle for execution time
metrics.
+type ExecutionStateTracker struct {
+ CurrentState BundleProcState
+ State ExecutionState
+ NumberOfTransitions int
+ MillisSinceLastTransition int
+ TransitionsAtLastSample int
+ LastSampleTimeMillis int
+}
+
// Store retains per transform countersets, intended for per bundle use.
type Store struct {
mu sync.RWMutex
css []*ptCounterSet
store map[Labels]userMetric
+
+ executionStore ExecutionStateTracker
+ stateRegistry map[Labels][]ExecutionState
}
func newStore() *Store {
- return &Store{store: make(map[Labels]userMetric)}
+ return &Store{store: make(map[Labels]userMetric), stateRegistry:
make(map[Labels][]ExecutionState)}
+}
+
+// AddState adds an ExecutionState of a PTransform to the stateRegistry.
+func (s *Store) AddState(e ExecutionState, pid string) {
+ label := PTransformLabels(pid)
+ if s.stateRegistry == nil {
+ s.stateRegistry = make(map[Labels][]ExecutionState)
+ }
+ s.stateRegistry[label] = append(s.stateRegistry[label], e)
Review comment:
AddState is being called in the sample method, and nothing clears these
away. That means that each individual sample is retained for the life of the
bundle.
Probably not noticeable in streaming, but in batch, a bundle can last for
hours, and that will add up to an astounding number of allocations.
We need to sum up contributions for the same PTransform + State combination
(bundle is implied by construction). This can be facilitated by having the
state be an int under the hood, and using a fixed sized array as mentioned
elsewhere.
To be concrete a Bundle that has 1 DoFn should have at most, 4 exec metrics
reported for it (start, process, finish, and total). 4 DoFns would have 16, etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]