lostluck commented on a change in pull request #15657:
URL: https://github.com/apache/beam/pull/15657#discussion_r737870109
##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -136,6 +136,10 @@ func SetPTransformID(ctx context.Context, id string)
context.Context {
// Checking for *beamCtx is an optimization, so we don't dig deeply
// for ids if not necessary.
if bctx, ok := ctx.(*beamCtx); ok {
+ if _, ok := bctx.store.stateRegistry[id]; !ok {
+ bctx.store.stateRegistry[id] =
[4]*ExecutionState{&ExecutionState{}, &ExecutionState{}, &ExecutionState{},
&ExecutionState{}}
Review comment:
Consider whether one can use `[4]ExecutionState{}` instead, which will
automatically initialize the values.
##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -28,28 +29,37 @@ type StateSampler struct {
}
// NewSampler creates a new state sampler.
-func NewSampler(ctx context.Context, store *Store) StateSampler {
+func NewSampler(store *Store) StateSampler {
return StateSampler{store: store}
}
-func (s *StateSampler) Sample(ctx context.Context, t time.Duration) {
- ps := loadCurrentState(ctx)
+func (s *StateSampler) Sample(t time.Duration) {
Review comment:
The Sample function should have a context so it can use the beam logger
for lull logging, when we hit >=5 minutes without transitions.
##########
File path: sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
##########
@@ -51,42 +53,65 @@ func groupByType(minfos []*pipepb.MonitoringInfo) (
}
r := bytes.NewReader(minfo.GetPayload())
-
- switch minfo.GetType() {
- case "beam:metrics:sum_int64:v1":
+ switch minfo.GetUrn() {
+ case "beam:metric:user:sum_int64:v1":
value, err := extractCounterValue(r)
if err != nil {
log.Println(err)
continue
}
counters[key] = value
- case "beam:metrics:distribution_int64:v1":
+ case "beam:metric:user:distribution_int64:v1":
value, err := extractDistributionValue(r)
if err != nil {
log.Println(err)
continue
}
distributions[key] = value
case
- "beam:metrics:latest_int64:v1",
- "beam:metrics:top_n_int64:v1",
- "beam:metrics:bottom_n_int64:v1":
+ "beam:metric:user:latest_int64:v1",
+ "beam:metric:user:top_n_int64:v1",
+ "beam:metric:user:bottom_n_int64:v1":
value, err := extractGaugeValue(r)
if err != nil {
log.Println(err)
continue
}
gauges[key] = value
+ case
+
"beam:metric:pardo_execution_time:start_bundle_msecs:v1",
Review comment:
Consider using `UrnToString(UrnStartBundle)` and similar to avoid
duplicating the string constants where they are prone to typos. Within a
package, the compiler is clever enough to inline things.
##########
File path: sdks/go/pkg/beam/core/metrics/sampler_test.go
##########
@@ -45,38 +45,38 @@ func TestSampler(t *testing.T) {
bctx := SetBundleID(ctx, "test")
st := GetStore(bctx)
- s := NewSampler(bctx, st)
+ s := NewSampler(st)
pctx := SetPTransformID(bctx, "transform")
label := "transform"
SetPTransformState(pctx, StartBundle)
- s.Sample(bctx, 200*time.Millisecond)
+ s.Sample(200 * time.Millisecond)
Review comment:
To simplify the test, just make a variable `interval := 200 *
time.Millisecond` and re-use it instead of copying the multiplication
everywhere. Do it per test, rather than a package level constant. That way
readers of the test will know whether what's important about it or not.
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -179,36 +180,29 @@ type BundleState struct {
func SetPTransformState(ctx context.Context, state bundleProcState) {
if bctx, ok := ctx.(*beamCtx); ok {
pid := bctx.ptransformID
- bctx.store.bundleState.Store(BundleState{pid: pid,
currentState: state})
+ bctx.store.states[state].pid = pid
Review comment:
The problem with this specifically is that it's a non-atomic write to
something that will be read by some other thread, which means there's a data
race. It's possible the Sampler will read the pid as it's being written to,
which is bad.
So, we need to have these pre-allocated BundleStates for each PTransform.
What if we did those allocations once per PTransform? We can do so by defining
a struct
```
type PTransformState struct{
states [3]BundleState
}
func NewPTransformState() *PTransformState {
return &PTransformState{
states: [3]BundleState{
{pid, StartBundle},
{pid, ProcessBundle},
{pid, FinishBundle},
},
}
}
func (p *PTransformState) Set(ctx context.Context, s bundState) {
/// ... existing implementation of SetPTransformState, but using the
local s.states values instead ....
}
```
That should avoid the race condition, and mean in `exec.ParDo` we add a
field `state *metrics.PTransformState`
which we initialize in the `Up` method, and call as we are currently doing.
Finally, move all this new code for the state to sampler.go, so that
everything that's related to sampling is in one place. *This does not mean
moving things out of the store struct,just which file it's in. (except for the
states field, which has just moved to the the PTransformState, and should not
be directly in store.) *
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -179,36 +180,29 @@ type BundleState struct {
func SetPTransformState(ctx context.Context, state bundleProcState) {
if bctx, ok := ctx.(*beamCtx); ok {
pid := bctx.ptransformID
- bctx.store.bundleState.Store(BundleState{pid: pid,
currentState: state})
+ bctx.store.states[state].pid = pid
+
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&bctx.store.bundleState)),
unsafe.Pointer(&bctx.store.states[state]))
atomic.AddInt64(&bctx.store.transitions, 1)
}
}
-// CurrentStateVal exports the current state of a bundle wrt PTransform.
-type CurrentStateVal struct {
+// currentStateVal exports the current state of a bundle wrt PTransform.
+type currentStateVal struct {
pid string
state bundleProcState
transitions int64
}
-func loadCurrentState(ctx context.Context) CurrentStateVal {
- if bctx, ok := ctx.(*beamCtx); ok {
- bs := bctx.store.bundleState.Load().(BundleState)
- return CurrentStateVal{pid: bs.pid, state: bs.currentState,
transitions: atomic.LoadInt64(&bctx.store.transitions)}
- }
- panic("execution store not yet set.")
-}
-
// Store retains per transform countersets, intended for per bundle use.
type Store struct {
mu sync.RWMutex
css []*ptCounterSet
store map[Labels]userMetric
- transitions int64
- bundleState atomic.Value
-
+ transitions int64
+ bundleState BundleState
Review comment:
This should be a pointer to a BundleState, so it's only changing an
address, not copying the string reference and an integer.
##########
File path: sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
##########
@@ -51,42 +53,65 @@ func groupByType(minfos []*pipepb.MonitoringInfo) (
}
r := bytes.NewReader(minfo.GetPayload())
-
- switch minfo.GetType() {
- case "beam:metrics:sum_int64:v1":
+ switch minfo.GetUrn() {
+ case "beam:metric:user:sum_int64:v1":
value, err := extractCounterValue(r)
if err != nil {
log.Println(err)
continue
}
counters[key] = value
- case "beam:metrics:distribution_int64:v1":
+ case "beam:metric:user:distribution_int64:v1":
value, err := extractDistributionValue(r)
if err != nil {
log.Println(err)
continue
}
distributions[key] = value
case
- "beam:metrics:latest_int64:v1",
- "beam:metrics:top_n_int64:v1",
- "beam:metrics:bottom_n_int64:v1":
+ "beam:metric:user:latest_int64:v1",
+ "beam:metric:user:top_n_int64:v1",
+ "beam:metric:user:bottom_n_int64:v1":
value, err := extractGaugeValue(r)
if err != nil {
log.Println(err)
continue
}
gauges[key] = value
+ case
+
"beam:metric:pardo_execution_time:start_bundle_msecs:v1",
+
"beam:metric:pardo_execution_time:process_bundle_msecs:v1",
+
"beam:metric:pardo_execution_time:finish_bundle_msecs:v1",
+ "beam:metric:ptransform_execution_time:total_msecs:v1":
+ value, err := extractMsecValue(r)
+ if err != nil {
+ log.Println(err)
+ continue
+ }
+ msecs[key] = value
default:
log.Println("unknown metric type")
}
}
- return counters, distributions, gauges
+ return counters, distributions, gauges, msecs
}
func extractKey(mi *pipepb.MonitoringInfo) (metrics.StepKey, error) {
labels := newLabels(mi.GetLabels())
stepName := labels.Transform()
+ urn := mi.GetUrn()
+ switch urn {
+ case "beam:metric:pardo_execution_time:start_bundle_msecs:v1":
Review comment:
Same comment here, WRT using UrnToString and our constants from
metricsx/urns.go.
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -91,6 +94,8 @@ type Extractor struct {
DistributionInt64 func(labels Labels, count, sum, min, max int64)
// GaugeInt64 extracts data from Gauge Int64 counters.
GaugeInt64 func(labels Labels, v int64, t time.Time)
+ // MsecsInt64 extracts data from StateRegistry of ExecutionState
Review comment:
Please add a comment here saying extracting Msec counters is
Experimental and subject to change. That way we can come back and clean things
up in a separate PR if needed.
##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -28,28 +29,37 @@ type StateSampler struct {
}
// NewSampler creates a new state sampler.
-func NewSampler(ctx context.Context, store *Store) StateSampler {
+func NewSampler(store *Store) StateSampler {
return StateSampler{store: store}
}
-func (s *StateSampler) Sample(ctx context.Context, t time.Duration) {
- ps := loadCurrentState(ctx)
+func (s *StateSampler) Sample(t time.Duration) {
+ ps := loadCurrentState(s)
+ if ps.pid == "" {
+ return
+ }
s.store.mu.Lock()
defer s.store.mu.Unlock()
if v, ok := s.store.stateRegistry[ps.pid]; ok {
v[ps.state].TotalTime += t
v[TotalBundle].TotalTime += t
- e := s.store
-
if s.transitionsAtLastSample != ps.transitions {
// state change detected
s.millisSinceLastTransition = 0
- e.transitions = ps.transitions
s.transitionsAtLastSample = ps.transitions
} else {
s.millisSinceLastTransition += t
}
Review comment:
Add another `if` clause here to report lulls.
We don't want to log every sample after the log interval, only once every 5
minutes. So we can have two new fields. One variable that has the logging
interval (5 minutes) and another that tracks what time "since" we want to log
next.
```
if s.millisSinceLastTransition > s.nextLogTime {
log.Info( ...standard long running operation log ... , pid, state,
s.millis.SinceLastTransition)
s.nextLogTime += s.logInterval
}
```
This means we need to reset the `s.nextLogTime` every time the samples
reset, so we can log the next time a transition takes 5 minutes.
Finally, since we are using time.Duration as the type for
`s.millisSinceLastTransition` call it `s.timeSinceLastTransition` instead. It
doesn't matter that we're later translating things to milliseconds in this one
here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]