lostluck commented on a change in pull request #15657:
URL: https://github.com/apache/beam/pull/15657#discussion_r732242562
##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -189,6 +193,8 @@ func (t kind) String() string {
return "Distribution"
case kindGauge:
return "Gauge"
+ case DoFnMsec:
+ return "DoFnMsec"
Review comment:
The goal with my previous comment was to only change the returned
String, not to change the variable name, which should be prefixed with `kind`
so it's not Exported needlessly.
##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+ "context"
+ "time"
+)
+
+// StateSampler tracks the state of a bundle.
+type StateSampler struct {
+ store *Store // used to store states into state
registry
+ millisSinceLastTransition time.Duration
+ transitionsAtLastSample int64
+}
+
+// NewSampler creates a new state sampler.
+func NewSampler(ctx context.Context, store *Store) StateSampler {
+ return StateSampler{store: store}
+}
+
+func (s *StateSampler) Sample(ctx context.Context, t time.Duration) {
+ ps := loadCurrentState(ctx)
Review comment:
Similarly, since the context is only used because we needed access to
the store (which we already have for the sampler), do we need to have the
context passed in?
Remember, if we don't need a given parameter or field, we should probably
get rid of it. We *do* need the context for the PTransformState calls, since
that's *how* the store is passed around for later access by users and such. But
this is part of the framework code, and can take the direct approach.
##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+ "context"
+ "time"
+)
+
+// StateSampler tracks the state of a bundle.
+type StateSampler struct {
+ store *Store // used to store states into state
registry
+ millisSinceLastTransition time.Duration
+ transitionsAtLastSample int64
+}
+
+// NewSampler creates a new state sampler.
+func NewSampler(ctx context.Context, store *Store) StateSampler {
+ return StateSampler{store: store}
+}
+
+func (s *StateSampler) Sample(ctx context.Context, t time.Duration) {
+ ps := loadCurrentState(ctx)
+ s.store.mu.Lock()
+ defer s.store.mu.Unlock()
+
+ if v, ok := s.store.stateRegistry[ps.pid]; ok {
+ v[ps.state].TotalTime += t
+ v[TotalBundle].TotalTime += t
+
+ e := s.store
+
+ if s.transitionsAtLastSample != ps.transitions {
+ // state change detected
+ s.millisSinceLastTransition = 0
+ e.transitions = ps.transitions
Review comment:
Why are the transitions getting overwritten here? Following your code,
`loadCurrentState` pulls the transitions from the store, and then re-assigns
the transitions to the store again, and not doing so atomically.
The only thing the sampler should be doing with the transitions from the
store is reading them, so they can be checked at the next call, to see if there
are any transitions since the last call. It should not be writing them.
##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+ "context"
+ "time"
+)
+
+// StateSampler tracks the state of a bundle.
+type StateSampler struct {
+ store *Store // used to store states into state
registry
+ millisSinceLastTransition time.Duration
+ transitionsAtLastSample int64
+}
+
+// NewSampler creates a new state sampler.
+func NewSampler(ctx context.Context, store *Store) StateSampler {
+ return StateSampler{store: store}
+}
+
+func (s *StateSampler) Sample(ctx context.Context, t time.Duration) {
+ ps := loadCurrentState(ctx)
+ s.store.mu.Lock()
+ defer s.store.mu.Unlock()
+
+ if v, ok := s.store.stateRegistry[ps.pid]; ok {
+ v[ps.state].TotalTime += t
+ v[TotalBundle].TotalTime += t
+
+ e := s.store
Review comment:
This doesn't make the code more readable, since it's only used once down
below anyway (if at all)
##########
File path: sdks/go/pkg/beam/core/metrics/sampler_test.go
##########
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+ "context"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func checkStateTime(t *testing.T, s StateSampler, label string, sb, pb, fb, tb
time.Duration) {
+ t.Helper()
+ r := s.store.stateRegistry
+ v := r[label]
+ if v[StartBundle].TotalTime != sb || v[ProcessBundle].TotalTime != pb
|| v[FinishBundle].TotalTime != fb || v[TotalBundle].TotalTime != tb {
+ t.Errorf("got: start: %v, process:%v, finish:%v, total:%v; want
start: %v, process:%v, finish:%v, total:%v",
+ v[StartBundle].TotalTime, v[ProcessBundle].TotalTime,
v[FinishBundle].TotalTime, v[TotalBundle].TotalTime, sb, pb, fb, tb)
+ }
+}
+
+func checkBundleState(ctx context.Context, t *testing.T, s StateSampler,
transitions int64, millisSinceLastTransition time.Duration) {
+ t.Helper()
+ e := atomic.LoadInt64(&s.store.transitions)
+ if e != transitions || s.millisSinceLastTransition !=
millisSinceLastTransition {
+ t.Errorf("number of transitions: %v, want %v \nmillis since
last transition: %vms, want %vms", e, transitions, s.millisSinceLastTransition,
millisSinceLastTransition)
+ }
+}
+
+func TestSampler(t *testing.T) {
+ ctx := context.Background()
+ bctx := SetBundleID(ctx, "test")
+
+ st := GetStore(bctx)
+ s := NewSampler(bctx, st)
+
+ pctx := SetPTransformID(bctx, "transform")
+ label := "transform"
+
+ SetPTransformState(pctx, StartBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+
+ // validate states and their time till now
+ checkStateTime(t, s, label, 200*time.Millisecond, 0, 0,
200*time.Millisecond)
+ checkBundleState(bctx, t, s, 1, 0)
+
+ SetPTransformState(pctx, ProcessBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+ SetPTransformState(pctx, ProcessBundle)
+ SetPTransformState(pctx, ProcessBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+
+ // validate states and their time till now
+ checkStateTime(t, s, label, 200*time.Millisecond, 400*time.Millisecond,
0, 600*time.Millisecond)
+ checkBundleState(bctx, t, s, 4, 0)
+
+ s.Sample(bctx, 200*time.Millisecond)
+ s.Sample(bctx, 200*time.Millisecond)
+ s.Sample(bctx, 200*time.Millisecond)
+
+ // validate states and their time till now
+ checkStateTime(t, s, label, 200*time.Millisecond,
1000*time.Millisecond, 0, 1200*time.Millisecond)
+ checkBundleState(bctx, t, s, 4, 600*time.Millisecond)
+
+ SetPTransformState(pctx, FinishBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+ // validate states and their time till now
+ checkStateTime(t, s, label, 200*time.Millisecond,
1000*time.Millisecond, 200*time.Millisecond, 1400*time.Millisecond)
+ checkBundleState(bctx, t, s, 5, 0)
+}
+
+func TestSampler_TwoPTransforms(t *testing.T) {
+ ctx := context.Background()
+ bctx := SetBundleID(ctx, "bundle")
+
+ st := GetStore(bctx)
+ s := NewSampler(bctx, st)
+
+ ctxA := SetPTransformID(bctx, "transformA")
+ ctxB := SetPTransformID(bctx, "transformB")
+
+ labelA := "transformA"
+ labelB := "transformB"
+
+ SetPTransformState(ctxA, ProcessBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+
+ // validate states and their time till now
+ checkStateTime(t, s, labelA, 0, 200*time.Millisecond, 0,
200*time.Millisecond)
+ checkStateTime(t, s, labelB, 0, 0, 0, 0)
+ checkBundleState(bctx, t, s, 1, 0)
+
+ SetPTransformState(ctxB, ProcessBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+ SetPTransformState(ctxA, ProcessBundle)
+ SetPTransformState(ctxB, ProcessBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+
+ // validate states and their time till now
+ checkStateTime(t, s, labelA, 0, 200*time.Millisecond, 0,
200*time.Millisecond)
+ checkStateTime(t, s, labelB, 0, 400*time.Millisecond, 0,
400*time.Millisecond)
+ checkBundleState(bctx, t, s, 4, 0)
+
+ s.Sample(bctx, 200*time.Millisecond)
+ s.Sample(bctx, 200*time.Millisecond)
+ s.Sample(bctx, 200*time.Millisecond)
+
+ // validate states and their time till now
+ checkStateTime(t, s, labelA, 0, 200*time.Millisecond, 0,
200*time.Millisecond)
+ checkStateTime(t, s, labelB, 0, 1000*time.Millisecond, 0,
1000*time.Millisecond)
+ checkBundleState(bctx, t, s, 4, 600*time.Millisecond)
+
+ SetPTransformState(ctxA, FinishBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+ SetPTransformState(ctxB, FinishBundle)
+
+ // validate states and their time till now
+ checkStateTime(t, s, labelA, 0, 200*time.Millisecond,
200*time.Millisecond, 400*time.Millisecond)
+ checkStateTime(t, s, labelB, 0, 1000*time.Millisecond, 0,
1000*time.Millisecond)
+ checkBundleState(bctx, t, s, 6, 0)
+}
+
+// goos: darwin
+// goarch: amd64
+// pkg: github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics
+// cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
+// BenchmarkMsec_SetPTransformState-12 27562231 43.36
ns/op 24 B/op 1 allocs/op
Review comment:
We definitely need to get rid of the allocation if we can. Ideally this
call is 0 allocations, and single digit ns/op.
We should be able to do it, if we pre-allocate the structs used for each
"state", and use `unsafe.Pointer` to store it with `atomic.StorePointer` (and
conversely `atomic.LoadPointer` in the load call the Sampler makes.).
##########
File path: sdks/go/pkg/beam/core/metrics/sampler.go
##########
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+ "context"
+ "time"
+)
+
+// StateSampler tracks the state of a bundle.
+type StateSampler struct {
+ store *Store // used to store states into state
registry
+ millisSinceLastTransition time.Duration
+ transitionsAtLastSample int64
+}
+
+// NewSampler creates a new state sampler.
+func NewSampler(ctx context.Context, store *Store) StateSampler {
+ return StateSampler{store: store}
+}
Review comment:
We only needed the context, so we could get the store. But since we
already have the store, we don't need the context.
##########
File path: sdks/go/pkg/beam/core/metrics/metrics.go
##########
@@ -179,6 +182,7 @@ const (
kindSumCounter
kindDistribution
kindGauge
+ DoFnMsec
Review comment:
kindDoFnMSec
##########
File path: sdks/go/pkg/beam/core/metrics/dumper.go
##########
@@ -74,6 +74,7 @@ func dumperExtractor(store *Store, p func(format string, args
...interface{})) {
GaugeInt64: func(l Labels, v int64, t time.Time) {
m[l] = &gauge{v: v, t: t}
},
+ MsecsInt64: func(labels string, stateRegistry
[4]*ExecutionState) {},
Review comment:
Add a comment here saying that MsecInt64 extraction is tested in \<test
name here\>.
##########
File path: sdks/go/pkg/beam/core/metrics/store.go
##########
@@ -145,16 +152,68 @@ type ptCounterSet struct {
gauges map[nameHash]*gauge
}
+// Bundle processing state (START_BUNDLE, PROCESS_BUNDLE, FINISH_BUNDLE)
+type bundleProcState int
+
+const (
+ StartBundle bundleProcState = 0
+ ProcessBundle bundleProcState = 1
+ FinishBundle bundleProcState = 2
+ TotalBundle bundleProcState = 3
+)
+
+// ExecutionState stores the information about a bundle in a particular state.
+type ExecutionState struct {
+ State bundleProcState
+ IsProcessing bool // set to true when sent as a response to
ProcessBundleProgress Request
+ TotalTime time.Duration
+}
+
+// BundleState stores information about a PTransform for execution time
metrics.
+type BundleState struct {
+ pid string
+ currentState bundleProcState
+}
+
+// SetPTransformState stores the state of PTransform in its bundle.
+func SetPTransformState(ctx context.Context, state bundleProcState) {
+ if bctx, ok := ctx.(*beamCtx); ok {
+ pid := bctx.ptransformID
+ bctx.store.bundleState.Store(BundleState{pid: pid,
currentState: state})
+ atomic.AddInt64(&bctx.store.transitions, 1)
+ }
+}
+
+// CurrentStateVal exports the current state of a bundle wrt PTransform.
+type CurrentStateVal struct {
Review comment:
Note, there's no reason to have this type be exported. Anythnig in the
metrics package can access and use it, and Exported types and fields are how we
signal the external API to users and ourselves outside of the package.
##########
File path: sdks/go/pkg/beam/core/metrics/sampler_test.go
##########
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+ "context"
+ "sync/atomic"
+ "testing"
+ "time"
+)
+
+func checkStateTime(t *testing.T, s StateSampler, label string, sb, pb, fb, tb
time.Duration) {
+ t.Helper()
+ r := s.store.stateRegistry
+ v := r[label]
+ if v[StartBundle].TotalTime != sb || v[ProcessBundle].TotalTime != pb
|| v[FinishBundle].TotalTime != fb || v[TotalBundle].TotalTime != tb {
+ t.Errorf("got: start: %v, process:%v, finish:%v, total:%v; want
start: %v, process:%v, finish:%v, total:%v",
+ v[StartBundle].TotalTime, v[ProcessBundle].TotalTime,
v[FinishBundle].TotalTime, v[TotalBundle].TotalTime, sb, pb, fb, tb)
+ }
+}
+
+func checkBundleState(ctx context.Context, t *testing.T, s StateSampler,
transitions int64, millisSinceLastTransition time.Duration) {
+ t.Helper()
+ e := atomic.LoadInt64(&s.store.transitions)
+ if e != transitions || s.millisSinceLastTransition !=
millisSinceLastTransition {
+ t.Errorf("number of transitions: %v, want %v \nmillis since
last transition: %vms, want %vms", e, transitions, s.millisSinceLastTransition,
millisSinceLastTransition)
+ }
+}
+
+func TestSampler(t *testing.T) {
+ ctx := context.Background()
+ bctx := SetBundleID(ctx, "test")
+
+ st := GetStore(bctx)
+ s := NewSampler(bctx, st)
+
+ pctx := SetPTransformID(bctx, "transform")
+ label := "transform"
+
+ SetPTransformState(pctx, StartBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+
+ // validate states and their time till now
+ checkStateTime(t, s, label, 200*time.Millisecond, 0, 0,
200*time.Millisecond)
+ checkBundleState(bctx, t, s, 1, 0)
+
+ SetPTransformState(pctx, ProcessBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+ SetPTransformState(pctx, ProcessBundle)
+ SetPTransformState(pctx, ProcessBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+
+ // validate states and their time till now
+ checkStateTime(t, s, label, 200*time.Millisecond, 400*time.Millisecond,
0, 600*time.Millisecond)
+ checkBundleState(bctx, t, s, 4, 0)
+
+ s.Sample(bctx, 200*time.Millisecond)
+ s.Sample(bctx, 200*time.Millisecond)
+ s.Sample(bctx, 200*time.Millisecond)
+
+ // validate states and their time till now
+ checkStateTime(t, s, label, 200*time.Millisecond,
1000*time.Millisecond, 0, 1200*time.Millisecond)
+ checkBundleState(bctx, t, s, 4, 600*time.Millisecond)
+
+ SetPTransformState(pctx, FinishBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+ // validate states and their time till now
+ checkStateTime(t, s, label, 200*time.Millisecond,
1000*time.Millisecond, 200*time.Millisecond, 1400*time.Millisecond)
+ checkBundleState(bctx, t, s, 5, 0)
+}
+
+func TestSampler_TwoPTransforms(t *testing.T) {
+ ctx := context.Background()
+ bctx := SetBundleID(ctx, "bundle")
+
+ st := GetStore(bctx)
+ s := NewSampler(bctx, st)
+
+ ctxA := SetPTransformID(bctx, "transformA")
+ ctxB := SetPTransformID(bctx, "transformB")
+
+ labelA := "transformA"
+ labelB := "transformB"
+
+ SetPTransformState(ctxA, ProcessBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+
+ // validate states and their time till now
+ checkStateTime(t, s, labelA, 0, 200*time.Millisecond, 0,
200*time.Millisecond)
+ checkStateTime(t, s, labelB, 0, 0, 0, 0)
+ checkBundleState(bctx, t, s, 1, 0)
+
+ SetPTransformState(ctxB, ProcessBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+ SetPTransformState(ctxA, ProcessBundle)
+ SetPTransformState(ctxB, ProcessBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+
+ // validate states and their time till now
+ checkStateTime(t, s, labelA, 0, 200*time.Millisecond, 0,
200*time.Millisecond)
+ checkStateTime(t, s, labelB, 0, 400*time.Millisecond, 0,
400*time.Millisecond)
+ checkBundleState(bctx, t, s, 4, 0)
+
+ s.Sample(bctx, 200*time.Millisecond)
+ s.Sample(bctx, 200*time.Millisecond)
+ s.Sample(bctx, 200*time.Millisecond)
+
+ // validate states and their time till now
+ checkStateTime(t, s, labelA, 0, 200*time.Millisecond, 0,
200*time.Millisecond)
+ checkStateTime(t, s, labelB, 0, 1000*time.Millisecond, 0,
1000*time.Millisecond)
+ checkBundleState(bctx, t, s, 4, 600*time.Millisecond)
+
+ SetPTransformState(ctxA, FinishBundle)
+ s.Sample(bctx, 200*time.Millisecond)
+ SetPTransformState(ctxB, FinishBundle)
+
+ // validate states and their time till now
+ checkStateTime(t, s, labelA, 0, 200*time.Millisecond,
200*time.Millisecond, 400*time.Millisecond)
+ checkStateTime(t, s, labelB, 0, 1000*time.Millisecond, 0,
1000*time.Millisecond)
+ checkBundleState(bctx, t, s, 6, 0)
+}
+
+// goos: darwin
+// goarch: amd64
+// pkg: github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics
+// cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
+// BenchmarkMsec_SetPTransformState-12 27562231 43.36
ns/op 24 B/op 1 allocs/op
+func BenchmarkMsec_SetPTransformState(b *testing.B) {
+ ctx := context.Background()
+ bctx := SetBundleID(ctx, "benchmark")
+ pctx := SetPTransformID(bctx, "transform")
+
+ for i := 0; i < b.N; i++ {
+ SetPTransformState(pctx, StartBundle)
+ }
+}
+
+// goos: darwin
+// goarch: amd64
+// pkg: github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics
+// cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
+// BenchmarkMsec_Sample-12 35614332 33.98 ns/op
0 B/op 0 allocs/op
Review comment:
This is excellent, and what we want to see for the infrequent sampler
calls.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]