[ 
https://issues.apache.org/jira/browse/BEAM-4727?focusedWorklogId=122039&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-122039
 ]

ASF GitHub Bot logged work on BEAM-4727:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Jul/18 20:43
            Start Date: 11/Jul/18 20:43
    Worklog Time Spent: 10m 
      Work Description: herohde closed pull request #5885: [BEAM-4727] Improve 
metric lookup overhead for predeclared metrics
URL: https://github.com/apache/beam/pull/5885
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/pkg/beam/core/metrics/metrics.go 
b/sdks/go/pkg/beam/core/metrics/metrics.go
index 73ef73a36b6..1899cc89971 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics.go
@@ -130,12 +130,12 @@ var (
        // simplifies code understanding, since each only contains a single 
type of
        // cell.
 
-       // counters is a map[key]*counter
-       counters = sync.Map{}
-       // distributions is a map[key]*distribution
-       distributions = sync.Map{}
-       // gauges is a map[key]*gauge
-       gauges = sync.Map{}
+       countersMu      sync.RWMutex
+       counters        = make(map[key]*counter)
+       distributionsMu sync.RWMutex
+       distributions   = make(map[key]*distribution)
+       gaugesMu        sync.RWMutex
+       gauges          = make(map[key]*gauge)
 )
 
 // TODO(lostluck): 2018/03/05 Use a common internal beam now() instead, once 
that exists.
@@ -144,37 +144,54 @@ var now = time.Now
 // Counter is a simple counter for incrementing and decrementing a value.
 type Counter struct {
        name name
+       // The following are a fast cache of the key and storage
+       mu sync.Mutex
+       k  key
+       c  *counter
 }
 
-func (m Counter) String() string {
+func (m *Counter) String() string {
        return fmt.Sprintf("Counter metric %s", m.name)
 }
 
 // NewCounter returns the Counter with the given namespace and name.
-func NewCounter(ns, n string) Counter {
+func NewCounter(ns, n string) *Counter {
        mn := newName(ns, n)
-       return Counter{
+       return &Counter{
                name: mn,
        }
 }
 
 // Inc increments the counter within the given PTransform context by v.
-func (m Counter) Inc(ctx context.Context, v int64) {
+func (m *Counter) Inc(ctx context.Context, v int64) {
        key := getContextKey(ctx, m.name)
        cs := &counter{
                value: v,
        }
-       if m, loaded := counters.LoadOrStore(key, cs); loaded {
-               c := m.(*counter)
+       m.mu.Lock()
+       if m.k == key {
+               m.c.inc(v)
+               m.mu.Unlock()
+               return
+       }
+       m.k = key
+       countersMu.Lock()
+       if c, loaded := counters[key]; loaded {
+               m.c = c
+               countersMu.Unlock()
+               m.mu.Unlock()
                c.inc(v)
-       } else {
-               c := m.(*counter)
-               storeMetric(key, c)
+               return
        }
+       m.c = cs
+       counters[key] = cs
+       countersMu.Unlock()
+       m.mu.Unlock()
+       storeMetric(key, cs)
 }
 
 // Dec decrements the counter within the given PTransform context by v.
-func (m Counter) Dec(ctx context.Context, v int64) {
+func (m *Counter) Dec(ctx context.Context, v int64) {
        m.Inc(ctx, -v)
 }
 
@@ -211,22 +228,27 @@ func (m *counter) toProto() *fnexecution_v1.Metrics_User {
 // Distribution is a simple distribution of values.
 type Distribution struct {
        name name
+
+       // The following are a fast cache of the key and storage
+       mu sync.Mutex
+       k  key
+       d  *distribution
 }
 
-func (m Distribution) String() string {
+func (m *Distribution) String() string {
        return fmt.Sprintf("Distribution metric %s", m.name)
 }
 
 // NewDistribution returns the Distribution with the given namespace and name.
-func NewDistribution(ns, n string) Distribution {
+func NewDistribution(ns, n string) *Distribution {
        mn := newName(ns, n)
-       return Distribution{
+       return &Distribution{
                name: mn,
        }
 }
 
 // Update updates the distribution within the given PTransform context with v.
-func (m Distribution) Update(ctx context.Context, v int64) {
+func (m *Distribution) Update(ctx context.Context, v int64) {
        key := getContextKey(ctx, m.name)
        ds := &distribution{
                count: 1,
@@ -234,13 +256,26 @@ func (m Distribution) Update(ctx context.Context, v 
int64) {
                min:   v,
                max:   v,
        }
-       if m, loaded := distributions.LoadOrStore(key, ds); loaded {
-               d := m.(*distribution)
+       m.mu.Lock()
+       if m.k == key {
+               m.d.update(v)
+               m.mu.Unlock()
+               return
+       }
+       m.k = key
+       distributionsMu.Lock()
+       if d, loaded := distributions[key]; loaded {
+               m.d = d
+               distributionsMu.Unlock()
+               m.mu.Unlock()
                d.update(v)
-       } else {
-               d := m.(*distribution)
-               storeMetric(key, d)
+               return
        }
+       m.d = ds
+       distributions[key] = ds
+       distributionsMu.Unlock()
+       m.mu.Unlock()
+       storeMetric(key, ds)
 }
 
 // distribution is a metric cell for distribution values.
@@ -286,37 +321,57 @@ func (m *distribution) toProto() 
*fnexecution_v1.Metrics_User {
 // Gauge is a time, value pair metric.
 type Gauge struct {
        name name
+
+       // The following are a fast cache of the key and storage
+       mu sync.Mutex
+       k  key
+       g  *gauge
 }
 
-func (m Gauge) String() string {
+func (m *Gauge) String() string {
        return fmt.Sprintf("Guage metric %s", m.name)
 }
 
 // NewGauge returns the Gauge with the given namespace and name.
-func NewGauge(ns, n string) Gauge {
+func NewGauge(ns, n string) *Gauge {
        mn := newName(ns, n)
-       return Gauge{
+       return &Gauge{
                name: mn,
        }
 }
 
 // Set sets the gauge to the given value, and associates it with the current 
time on the clock.
-func (m Gauge) Set(ctx context.Context, v int64) {
+func (m *Gauge) Set(ctx context.Context, v int64) {
        key := getContextKey(ctx, m.name)
        gs := &gauge{
                t: now(),
                v: v,
        }
-       if m, loaded := gauges.LoadOrStore(key, gs); loaded {
-               g := m.(*gauge)
+       m.mu.Lock()
+       if m.k == key {
+               m.g.set(v)
+               m.mu.Unlock()
+               return
+       }
+       m.k = key
+       gaugesMu.Lock()
+       if g, loaded := gauges[key]; loaded {
+               m.g = g
+               gaugesMu.Unlock()
+               m.mu.Unlock()
                g.set(v)
-       } else {
-               g := m.(*gauge)
-               storeMetric(key, g)
+               return
        }
+       m.g = gs
+       gauges[key] = gs
+       gaugesMu.Unlock()
+       m.mu.Unlock()
+       storeMetric(key, gs)
 }
 
 // storeMetric stores a metric away on its first use so it may be retrieved 
later on.
+// In the event of a name collision, storeMetric can panic, so it's prudent to 
release
+// locks if they are no longer required.
 func storeMetric(key key, m userMetric) {
        mu.Lock()
        defer mu.Unlock()
@@ -402,6 +457,12 @@ func DumpToOut() {
 func dumpTo(p func(format string, args ...interface{})) {
        mu.RLock()
        defer mu.RUnlock()
+       countersMu.RLock()
+       defer countersMu.RUnlock()
+       distributionsMu.RLock()
+       defer distributionsMu.RUnlock()
+       gaugesMu.RLock()
+       defer gaugesMu.RUnlock()
        var bs []string
        for b := range store {
                bs = append(bs, b)
@@ -430,13 +491,13 @@ func dumpTo(p func(format string, args ...interface{})) {
                        p("Bundle: %q - PTransformID: %q", b, pt)
                        for _, n := range ns {
                                key := key{name: n, bundle: b, ptransform: pt}
-                               if m, ok := counters.Load(key); ok {
+                               if m, ok := counters[key]; ok {
                                        p("\t%s - %s", key.name, m)
                                }
-                               if m, ok := distributions.Load(key); ok {
+                               if m, ok := distributions[key]; ok {
                                        p("\t%s - %s", key.name, m)
                                }
-                               if m, ok := gauges.Load(key); ok {
+                               if m, ok := gauges[key]; ok {
                                        p("\t%s - %s", key.name, m)
                                }
                        }
@@ -449,9 +510,9 @@ func dumpTo(p func(format string, args ...interface{})) {
 func Clear() {
        mu.Lock()
        store = make(map[string]map[string]map[name]userMetric)
-       counters = sync.Map{}
-       distributions = sync.Map{}
-       gauges = sync.Map{}
+       counters = make(map[key]*counter)
+       distributions = make(map[key]*distribution)
+       gauges = make(map[key]*gauge)
        mu.Unlock()
 }
 
@@ -462,14 +523,20 @@ func ClearBundleData(b string) {
        // and the metric cell sync.Maps are goroutine safe.
        mu.Lock()
        pts := store[b]
+       countersMu.Lock()
+       distributionsMu.Lock()
+       gaugesMu.Lock()
        for pt, m := range pts {
                for n := range m {
                        key := key{name: n, bundle: b, ptransform: pt}
-                       counters.Delete(key)
-                       distributions.Delete(key)
-                       gauges.Delete(key)
+                       delete(counters, key)
+                       delete(distributions, key)
+                       delete(gauges, key)
                }
        }
+       countersMu.Unlock()
+       distributionsMu.Unlock()
+       gaugesMu.Unlock()
        delete(store, b)
        mu.Unlock()
 }
diff --git a/sdks/go/pkg/beam/core/metrics/metrics_test.go 
b/sdks/go/pkg/beam/core/metrics/metrics_test.go
index 80724df2580..6e289814270 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics_test.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics_test.go
@@ -64,11 +64,12 @@ func TestCounter_Inc(t *testing.T) {
                                m.Inc(ctx, test.inc)
 
                                key := key{name: name{namespace: test.ns, name: 
test.n}, bundle: bID, ptransform: test.key}
-                               q, ok := counters.Load(key)
+                               countersMu.Lock()
+                               c, ok := counters[key]
+                               countersMu.Unlock()
                                if !ok {
                                        t.Fatalf("Unable to find Counter for 
key %v", key)
                                }
-                               c := q.(*counter)
                                if got, want := c.value, test.value; got != 
want {
                                        t.Errorf("GetCounter(%q,%q).Inc(%s, %d) 
c.value got %v, want %v", test.ns, test.n, test.key, test.inc, got, want)
                                }
@@ -108,11 +109,12 @@ func TestCounter_Dec(t *testing.T) {
                                m.Dec(ctx, test.dec)
 
                                key := key{name: name{namespace: test.ns, name: 
test.n}, bundle: bID, ptransform: test.key}
-                               q, ok := counters.Load(key)
+                               countersMu.Lock()
+                               c, ok := counters[key]
+                               countersMu.Unlock()
                                if !ok {
                                        t.Fatalf("Unable to find Counter for 
key %v", key)
                                }
-                               c := q.(*counter)
                                if got, want := c.value, test.value; got != 
want {
                                        t.Errorf("GetCounter(%q,%q).Dec(%s, %d) 
c.value got %v, want %v", test.ns, test.n, test.key, test.dec, got, want)
                                }
@@ -157,11 +159,12 @@ func TestDistribution_Update(t *testing.T) {
                                m.Update(ctx, test.v)
 
                                key := key{name: name{namespace: test.ns, name: 
test.n}, bundle: bID, ptransform: test.key}
-                               q, ok := distributions.Load(key)
+                               distributionsMu.Lock()
+                               d, ok := distributions[key]
+                               distributionsMu.Unlock()
                                if !ok {
                                        t.Fatalf("Unable to find Distribution 
for key %v", key)
                                }
-                               d := q.(*distribution)
                                if got, want := d.count, test.count; got != 
want {
                                        
t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.count got %v, want %v", 
test.ns, test.n, test.key, test.v, got, want)
                                }
@@ -215,11 +218,12 @@ func TestGauge_Set(t *testing.T) {
                                m.Set(ctx, test.v)
 
                                key := key{name: name{namespace: test.ns, name: 
test.n}, bundle: bID, ptransform: test.key}
-                               q, ok := gauges.Load(key)
+                               gaugesMu.Lock()
+                               g, ok := gauges[key]
+                               gaugesMu.Unlock()
                                if !ok {
                                        t.Fatalf("Unable to find Gauge for key 
%v", key)
                                }
-                               g := q.(*gauge)
                                if got, want := g.v, test.v; got != want {
                                        t.Errorf("GetGauge(%q,%q).Set(%s, %d) 
g.v got %v, want %v", test.ns, test.n, test.key, test.v, got, want)
                                }
@@ -356,3 +360,39 @@ func TestClearBundleData(t *testing.T) {
                t.Fatalf("len(ToProto(%q, %q)) = %v, want %v - newOP: %v", 
otherBundleID, pt, got, want, newOP)
        }
 }
+
+// Run on @lostluck's desktop:
+//
+// BenchmarkMetrics/counter_inplace-12                  5000000               
243 ns/op             128 B/op          2 allocs/op
+// BenchmarkMetrics/distribution_inplace-12             5000000               
252 ns/op             160 B/op          2 allocs/op
+// BenchmarkMetrics/gauge_inplace-12                    5000000               
266 ns/op             160 B/op          2 allocs/op
+// BenchmarkMetrics/counter_predeclared-12             20000000                
74.3 ns/op            16 B/op          1 allocs/op
+// BenchmarkMetrics/distribution_predeclared-12                20000000        
        79.6 ns/op            48 B/op          1 allocs/op
+// BenchmarkMetrics/gauge_predeclared-12                       20000000        
        92.9 ns/op            48 B/op          1 allocs/op
+func BenchmarkMetrics(b *testing.B) {
+       Clear()
+       pt, c, d, g := "bench.bundle.data", "counter", "distribution", "gauge"
+       aBundleID := "benchBID"
+       ctx := ctxWith(aBundleID, pt)
+       count := NewCounter(pt, c)
+       dist := NewDistribution(pt, d)
+       gauge := NewGauge(pt, g)
+       tests := []struct {
+               name string
+               call func()
+       }{
+               {"counter_inplace", func() { NewCounter(pt, c).Inc(ctx, 1) }},
+               {"distribution_inplace", func() { NewDistribution(pt, 
d).Update(ctx, 1) }},
+               {"gauge_inplace", func() { NewGauge(pt, g).Set(ctx, 1) }},
+               {"counter_predeclared", func() { count.Inc(ctx, 1) }},
+               {"distribution_predeclared", func() { dist.Update(ctx, 1) }},
+               {"gauge_predeclared", func() { gauge.Set(ctx, 1) }},
+       }
+       for _, test := range tests {
+               b.Run(test.name, func(b *testing.B) {
+                       for i := 0; i < b.N; i++ {
+                               test.call()
+                       }
+               })
+       }
+}
diff --git a/sdks/go/pkg/beam/metrics.go b/sdks/go/pkg/beam/metrics.go
index 5e5bef6ecba..67da5b1f44b 100644
--- a/sdks/go/pkg/beam/metrics.go
+++ b/sdks/go/pkg/beam/metrics.go
@@ -24,7 +24,7 @@ import (
 // Counter is a metric that can be incremented and decremented,
 // and is aggregated by the sum.
 type Counter struct {
-       metrics.Counter
+       *metrics.Counter
 }
 
 // Inc increments the counter within by the given amount.
@@ -45,7 +45,7 @@ func NewCounter(namespace, name string) Counter {
 // Distribution is a metric that records various statistics about the 
distribution
 // of reported values.
 type Distribution struct {
-       metrics.Distribution
+       *metrics.Distribution
 }
 
 // Update adds an observation to this distribution.
@@ -61,7 +61,7 @@ func NewDistribution(namespace, name string) Distribution {
 // Gauge is a metric that can have its new value set, and is aggregated by 
taking
 // the last reported value.
 type Gauge struct {
-       metrics.Gauge
+       *metrics.Gauge
 }
 
 // Set sets the current value for this gauge.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 122039)
    Time Spent: 2h 40m  (was: 2.5h)

> Reduce metrics overhead
> -----------------------
>
>                 Key: BEAM-4727
>                 URL: https://issues.apache.org/jira/browse/BEAM-4727
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Robert Burke
>            Priority: Major
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> There are a few opportunities to avoid metrics overhead.
> First when setting state in the context, we allocate a new one for the stored 
> value, per element. This generates a fair amount of objects for the garbage 
> collector too. If we retain and re-use contexts within a bundle, we would 
> have the opportunity to save on these costs.
> Also, it's possible that we have overhead on the metric updating paths. We 
> can possibly do better than the general sync.Map, and avoid the type 
> assertion cost for extracting values of known types from the maps.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to