[
https://issues.apache.org/jira/browse/BEAM-4727?focusedWorklogId=122039&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-122039
]
ASF GitHub Bot logged work on BEAM-4727:
----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Jul/18 20:43
Start Date: 11/Jul/18 20:43
Worklog Time Spent: 10m
Work Description: herohde closed pull request #5885: [BEAM-4727] Improve
metric lookup overhead for predeclared metrics
URL: https://github.com/apache/beam/pull/5885
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/go/pkg/beam/core/metrics/metrics.go
b/sdks/go/pkg/beam/core/metrics/metrics.go
index 73ef73a36b6..1899cc89971 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics.go
@@ -130,12 +130,12 @@ var (
// simplifies code understanding, since each only contains a single
type of
// cell.
- // counters is a map[key]*counter
- counters = sync.Map{}
- // distributions is a map[key]*distribution
- distributions = sync.Map{}
- // gauges is a map[key]*gauge
- gauges = sync.Map{}
+ countersMu sync.RWMutex
+ counters = make(map[key]*counter)
+ distributionsMu sync.RWMutex
+ distributions = make(map[key]*distribution)
+ gaugesMu sync.RWMutex
+ gauges = make(map[key]*gauge)
)
// TODO(lostluck): 2018/03/05 Use a common internal beam now() instead, once
that exists.
@@ -144,37 +144,54 @@ var now = time.Now
// Counter is a simple counter for incrementing and decrementing a value.
type Counter struct {
name name
+ // The following are a fast cache of the key and storage
+ mu sync.Mutex
+ k key
+ c *counter
}
-func (m Counter) String() string {
+func (m *Counter) String() string {
return fmt.Sprintf("Counter metric %s", m.name)
}
// NewCounter returns the Counter with the given namespace and name.
-func NewCounter(ns, n string) Counter {
+func NewCounter(ns, n string) *Counter {
mn := newName(ns, n)
- return Counter{
+ return &Counter{
name: mn,
}
}
// Inc increments the counter within the given PTransform context by v.
-func (m Counter) Inc(ctx context.Context, v int64) {
+func (m *Counter) Inc(ctx context.Context, v int64) {
key := getContextKey(ctx, m.name)
cs := &counter{
value: v,
}
- if m, loaded := counters.LoadOrStore(key, cs); loaded {
- c := m.(*counter)
+ m.mu.Lock()
+ if m.k == key {
+ m.c.inc(v)
+ m.mu.Unlock()
+ return
+ }
+ m.k = key
+ countersMu.Lock()
+ if c, loaded := counters[key]; loaded {
+ m.c = c
+ countersMu.Unlock()
+ m.mu.Unlock()
c.inc(v)
- } else {
- c := m.(*counter)
- storeMetric(key, c)
+ return
}
+ m.c = cs
+ counters[key] = cs
+ countersMu.Unlock()
+ m.mu.Unlock()
+ storeMetric(key, cs)
}
// Dec decrements the counter within the given PTransform context by v.
-func (m Counter) Dec(ctx context.Context, v int64) {
+func (m *Counter) Dec(ctx context.Context, v int64) {
m.Inc(ctx, -v)
}
@@ -211,22 +228,27 @@ func (m *counter) toProto() *fnexecution_v1.Metrics_User {
// Distribution is a simple distribution of values.
type Distribution struct {
name name
+
+ // The following are a fast cache of the key and storage
+ mu sync.Mutex
+ k key
+ d *distribution
}
-func (m Distribution) String() string {
+func (m *Distribution) String() string {
return fmt.Sprintf("Distribution metric %s", m.name)
}
// NewDistribution returns the Distribution with the given namespace and name.
-func NewDistribution(ns, n string) Distribution {
+func NewDistribution(ns, n string) *Distribution {
mn := newName(ns, n)
- return Distribution{
+ return &Distribution{
name: mn,
}
}
// Update updates the distribution within the given PTransform context with v.
-func (m Distribution) Update(ctx context.Context, v int64) {
+func (m *Distribution) Update(ctx context.Context, v int64) {
key := getContextKey(ctx, m.name)
ds := &distribution{
count: 1,
@@ -234,13 +256,26 @@ func (m Distribution) Update(ctx context.Context, v
int64) {
min: v,
max: v,
}
- if m, loaded := distributions.LoadOrStore(key, ds); loaded {
- d := m.(*distribution)
+ m.mu.Lock()
+ if m.k == key {
+ m.d.update(v)
+ m.mu.Unlock()
+ return
+ }
+ m.k = key
+ distributionsMu.Lock()
+ if d, loaded := distributions[key]; loaded {
+ m.d = d
+ distributionsMu.Unlock()
+ m.mu.Unlock()
d.update(v)
- } else {
- d := m.(*distribution)
- storeMetric(key, d)
+ return
}
+ m.d = ds
+ distributions[key] = ds
+ distributionsMu.Unlock()
+ m.mu.Unlock()
+ storeMetric(key, ds)
}
// distribution is a metric cell for distribution values.
@@ -286,37 +321,57 @@ func (m *distribution) toProto()
*fnexecution_v1.Metrics_User {
// Gauge is a time, value pair metric.
type Gauge struct {
name name
+
+ // The following are a fast cache of the key and storage
+ mu sync.Mutex
+ k key
+ g *gauge
}
-func (m Gauge) String() string {
+func (m *Gauge) String() string {
return fmt.Sprintf("Guage metric %s", m.name)
}
// NewGauge returns the Gauge with the given namespace and name.
-func NewGauge(ns, n string) Gauge {
+func NewGauge(ns, n string) *Gauge {
mn := newName(ns, n)
- return Gauge{
+ return &Gauge{
name: mn,
}
}
// Set sets the gauge to the given value, and associates it with the current
time on the clock.
-func (m Gauge) Set(ctx context.Context, v int64) {
+func (m *Gauge) Set(ctx context.Context, v int64) {
key := getContextKey(ctx, m.name)
gs := &gauge{
t: now(),
v: v,
}
- if m, loaded := gauges.LoadOrStore(key, gs); loaded {
- g := m.(*gauge)
+ m.mu.Lock()
+ if m.k == key {
+ m.g.set(v)
+ m.mu.Unlock()
+ return
+ }
+ m.k = key
+ gaugesMu.Lock()
+ if g, loaded := gauges[key]; loaded {
+ m.g = g
+ gaugesMu.Unlock()
+ m.mu.Unlock()
g.set(v)
- } else {
- g := m.(*gauge)
- storeMetric(key, g)
+ return
}
+ m.g = gs
+ gauges[key] = gs
+ gaugesMu.Unlock()
+ m.mu.Unlock()
+ storeMetric(key, gs)
}
// storeMetric stores a metric away on its first use so it may be retrieved
later on.
+// In the event of a name collision, storeMetric can panic, so it's prudent to
release
+// locks if they are no longer required.
func storeMetric(key key, m userMetric) {
mu.Lock()
defer mu.Unlock()
@@ -402,6 +457,12 @@ func DumpToOut() {
func dumpTo(p func(format string, args ...interface{})) {
mu.RLock()
defer mu.RUnlock()
+ countersMu.RLock()
+ defer countersMu.RUnlock()
+ distributionsMu.RLock()
+ defer distributionsMu.RUnlock()
+ gaugesMu.RLock()
+ defer gaugesMu.RUnlock()
var bs []string
for b := range store {
bs = append(bs, b)
@@ -430,13 +491,13 @@ func dumpTo(p func(format string, args ...interface{})) {
p("Bundle: %q - PTransformID: %q", b, pt)
for _, n := range ns {
key := key{name: n, bundle: b, ptransform: pt}
- if m, ok := counters.Load(key); ok {
+ if m, ok := counters[key]; ok {
p("\t%s - %s", key.name, m)
}
- if m, ok := distributions.Load(key); ok {
+ if m, ok := distributions[key]; ok {
p("\t%s - %s", key.name, m)
}
- if m, ok := gauges.Load(key); ok {
+ if m, ok := gauges[key]; ok {
p("\t%s - %s", key.name, m)
}
}
@@ -449,9 +510,9 @@ func dumpTo(p func(format string, args ...interface{})) {
func Clear() {
mu.Lock()
store = make(map[string]map[string]map[name]userMetric)
- counters = sync.Map{}
- distributions = sync.Map{}
- gauges = sync.Map{}
+ counters = make(map[key]*counter)
+ distributions = make(map[key]*distribution)
+ gauges = make(map[key]*gauge)
mu.Unlock()
}
@@ -462,14 +523,20 @@ func ClearBundleData(b string) {
// and the metric cell sync.Maps are goroutine safe.
mu.Lock()
pts := store[b]
+ countersMu.Lock()
+ distributionsMu.Lock()
+ gaugesMu.Lock()
for pt, m := range pts {
for n := range m {
key := key{name: n, bundle: b, ptransform: pt}
- counters.Delete(key)
- distributions.Delete(key)
- gauges.Delete(key)
+ delete(counters, key)
+ delete(distributions, key)
+ delete(gauges, key)
}
}
+ countersMu.Unlock()
+ distributionsMu.Unlock()
+ gaugesMu.Unlock()
delete(store, b)
mu.Unlock()
}
diff --git a/sdks/go/pkg/beam/core/metrics/metrics_test.go
b/sdks/go/pkg/beam/core/metrics/metrics_test.go
index 80724df2580..6e289814270 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics_test.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics_test.go
@@ -64,11 +64,12 @@ func TestCounter_Inc(t *testing.T) {
m.Inc(ctx, test.inc)
key := key{name: name{namespace: test.ns, name:
test.n}, bundle: bID, ptransform: test.key}
- q, ok := counters.Load(key)
+ countersMu.Lock()
+ c, ok := counters[key]
+ countersMu.Unlock()
if !ok {
t.Fatalf("Unable to find Counter for
key %v", key)
}
- c := q.(*counter)
if got, want := c.value, test.value; got !=
want {
t.Errorf("GetCounter(%q,%q).Inc(%s, %d)
c.value got %v, want %v", test.ns, test.n, test.key, test.inc, got, want)
}
@@ -108,11 +109,12 @@ func TestCounter_Dec(t *testing.T) {
m.Dec(ctx, test.dec)
key := key{name: name{namespace: test.ns, name:
test.n}, bundle: bID, ptransform: test.key}
- q, ok := counters.Load(key)
+ countersMu.Lock()
+ c, ok := counters[key]
+ countersMu.Unlock()
if !ok {
t.Fatalf("Unable to find Counter for
key %v", key)
}
- c := q.(*counter)
if got, want := c.value, test.value; got !=
want {
t.Errorf("GetCounter(%q,%q).Dec(%s, %d)
c.value got %v, want %v", test.ns, test.n, test.key, test.dec, got, want)
}
@@ -157,11 +159,12 @@ func TestDistribution_Update(t *testing.T) {
m.Update(ctx, test.v)
key := key{name: name{namespace: test.ns, name:
test.n}, bundle: bID, ptransform: test.key}
- q, ok := distributions.Load(key)
+ distributionsMu.Lock()
+ d, ok := distributions[key]
+ distributionsMu.Unlock()
if !ok {
t.Fatalf("Unable to find Distribution
for key %v", key)
}
- d := q.(*distribution)
if got, want := d.count, test.count; got !=
want {
t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.count got %v, want %v",
test.ns, test.n, test.key, test.v, got, want)
}
@@ -215,11 +218,12 @@ func TestGauge_Set(t *testing.T) {
m.Set(ctx, test.v)
key := key{name: name{namespace: test.ns, name:
test.n}, bundle: bID, ptransform: test.key}
- q, ok := gauges.Load(key)
+ gaugesMu.Lock()
+ g, ok := gauges[key]
+ gaugesMu.Unlock()
if !ok {
t.Fatalf("Unable to find Gauge for key
%v", key)
}
- g := q.(*gauge)
if got, want := g.v, test.v; got != want {
t.Errorf("GetGauge(%q,%q).Set(%s, %d)
g.v got %v, want %v", test.ns, test.n, test.key, test.v, got, want)
}
@@ -356,3 +360,39 @@ func TestClearBundleData(t *testing.T) {
t.Fatalf("len(ToProto(%q, %q)) = %v, want %v - newOP: %v",
otherBundleID, pt, got, want, newOP)
}
}
+
+// Run on @lostluck's desktop:
+//
+// BenchmarkMetrics/counter_inplace-12 5000000
243 ns/op 128 B/op 2 allocs/op
+// BenchmarkMetrics/distribution_inplace-12 5000000
252 ns/op 160 B/op 2 allocs/op
+// BenchmarkMetrics/gauge_inplace-12 5000000
266 ns/op 160 B/op 2 allocs/op
+// BenchmarkMetrics/counter_predeclared-12 20000000
74.3 ns/op 16 B/op 1 allocs/op
+// BenchmarkMetrics/distribution_predeclared-12 20000000
79.6 ns/op 48 B/op 1 allocs/op
+// BenchmarkMetrics/gauge_predeclared-12 20000000
92.9 ns/op 48 B/op 1 allocs/op
+func BenchmarkMetrics(b *testing.B) {
+ Clear()
+ pt, c, d, g := "bench.bundle.data", "counter", "distribution", "gauge"
+ aBundleID := "benchBID"
+ ctx := ctxWith(aBundleID, pt)
+ count := NewCounter(pt, c)
+ dist := NewDistribution(pt, d)
+ gauge := NewGauge(pt, g)
+ tests := []struct {
+ name string
+ call func()
+ }{
+ {"counter_inplace", func() { NewCounter(pt, c).Inc(ctx, 1) }},
+ {"distribution_inplace", func() { NewDistribution(pt,
d).Update(ctx, 1) }},
+ {"gauge_inplace", func() { NewGauge(pt, g).Set(ctx, 1) }},
+ {"counter_predeclared", func() { count.Inc(ctx, 1) }},
+ {"distribution_predeclared", func() { dist.Update(ctx, 1) }},
+ {"gauge_predeclared", func() { gauge.Set(ctx, 1) }},
+ }
+ for _, test := range tests {
+ b.Run(test.name, func(b *testing.B) {
+ for i := 0; i < b.N; i++ {
+ test.call()
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/metrics.go b/sdks/go/pkg/beam/metrics.go
index 5e5bef6ecba..67da5b1f44b 100644
--- a/sdks/go/pkg/beam/metrics.go
+++ b/sdks/go/pkg/beam/metrics.go
@@ -24,7 +24,7 @@ import (
// Counter is a metric that can be incremented and decremented,
// and is aggregated by the sum.
type Counter struct {
- metrics.Counter
+ *metrics.Counter
}
// Inc increments the counter within by the given amount.
@@ -45,7 +45,7 @@ func NewCounter(namespace, name string) Counter {
// Distribution is a metric that records various statistics about the
distribution
// of reported values.
type Distribution struct {
- metrics.Distribution
+ *metrics.Distribution
}
// Update adds an observation to this distribution.
@@ -61,7 +61,7 @@ func NewDistribution(namespace, name string) Distribution {
// Gauge is a metric that can have its new value set, and is aggregated by
taking
// the last reported value.
type Gauge struct {
- metrics.Gauge
+ *metrics.Gauge
}
// Set sets the current value for this gauge.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 122039)
Time Spent: 2h 40m (was: 2.5h)
> Reduce metrics overhead
> -----------------------
>
> Key: BEAM-4727
> URL: https://issues.apache.org/jira/browse/BEAM-4727
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Robert Burke
> Priority: Major
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> There are a few opportunities to avoid metrics overhead.
> First when setting state in the context, we allocate a new one for the stored
> value, per element. This generates a fair amount of objects for the garbage
> collector too. If we retain and re-use contexts within a bundle, we would
> have the opportunity to save on these costs.
> Also, it's possible that we have overhead on the metric updating paths. We
> can possibly do better than the general sync.Map, and avoid the type
> assertion cost for extracting values of known types from the maps.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)