[ 
https://issues.apache.org/jira/browse/BEAM-9167?focusedWorklogId=380228&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-380228
 ]

ASF GitHub Bot logged work on BEAM-9167:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 31/Jan/20 22:24
            Start Date: 31/Jan/20 22:24
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on pull request #10716: [BEAM-9167] 
Metrics extraction refactoring.
URL: https://github.com/apache/beam/pull/10716#discussion_r373709779
 
 

 ##########
 File path: sdks/go/pkg/beam/core/metrics/metrics.go
 ##########
 @@ -144,63 +119,55 @@ func (ctx *beamCtx) String() string {
        return fmt.Sprintf("beamCtx[%s;%s]", ctx.bundleID, ctx.ptransformID)
 }
 
-// SetBundleID sets the id of the current Bundle.
+// SetBundleID sets the id of the current Bundle, and populates the store.
 func SetBundleID(ctx context.Context, id string) context.Context {
        // Checking for *beamCtx is an optimization, so we don't dig deeply
        // for ids if not necessary.
        if bctx, ok := ctx.(*beamCtx); ok {
-               return &beamCtx{Context: bctx.Context, bundleID: id, bs: 
&perBundle{}, ptransformID: bctx.ptransformID}
+               return &beamCtx{Context: bctx.Context, bundleID: id, store: 
newStore(), ptransformID: bctx.ptransformID}
        }
-       return &beamCtx{Context: ctx, bundleID: id, bs: &perBundle{}}
+       return &beamCtx{Context: ctx, bundleID: id, store: newStore()}
 }
 
 // SetPTransformID sets the id of the current PTransform.
-// Must only be called on a context returened by SetBundleID.
+// Must only be called on a context returned by SetBundleID.
 func SetPTransformID(ctx context.Context, id string) context.Context {
        // Checking for *beamCtx is an optimization, so we don't dig deeply
        // for ids if not necessary.
        if bctx, ok := ctx.(*beamCtx); ok {
-               return &beamCtx{Context: bctx.Context, bundleID: bctx.bundleID, 
bs: bctx.bs, ptransformID: id}
+               return &beamCtx{Context: bctx.Context, bundleID: bctx.bundleID, 
store: bctx.store, ptransformID: id}
+       }
+       // Avoid breaking if the bundle is unset in testing.
+       return &beamCtx{Context: ctx, bundleID: bundleIDUnset, store: 
newStore(), ptransformID: id}
+}
+
+// GetStore extracts the metrics Store for the given context for a bundle.
+//
+// Returns nil if the context doesn't contain a metric Store.
+func GetStore(ctx context.Context) *Store {
+       if bctx, ok := ctx.(*beamCtx); ok {
+               return bctx.store
+       }
+       if v := ctx.Value(storeKey); v != nil {
+               return v.(*Store)
        }
-       panic(fmt.Sprintf("SetPTransformID called before SetBundleID for %v", 
id))
-       return nil // never runs.
+       return nil
 }
 
 const (
        bundleIDUnset     = "(bundle id unset)"
        ptransformIDUnset = "(ptransform id unset)"
 )
 
-func getContextKey(ctx context.Context, n name) key {
-       key := key{name: n, bundle: bundleIDUnset, ptransform: 
ptransformIDUnset}
-       if id := ctx.Value(bundleKey); id != nil {
-               key.bundle = id.(string)
-       }
-       if id := ctx.Value(ptransformKey); id != nil {
-               key.ptransform = id.(string)
-       }
-       return key
-}
-
 func getCounterSet(ctx context.Context) *ptCounterSet {
-       if id := ctx.Value(counterSetKey); id != nil {
-               return id.(*ptCounterSet)
+       if bctx, ok := ctx.(*beamCtx); ok && bctx.cs != nil {
+               return bctx.cs
        }
-       // It's not set anywhere and wasn't hoisted, so create it.
-       if bctx, ok := ctx.(*beamCtx); ok {
-               bctx.bs.mu.Lock()
-               cs := &ptCounterSet{
-                       counters:      make(map[nameHash]*counter),
-                       distributions: make(map[nameHash]*distribution),
-                       gauges:        make(map[nameHash]*gauge),
-               }
-               bctx.bs.css = append(bctx.bs.css, cs)
-               bctx.cs = cs
-               bctx.bs.mu.Unlock()
-               return cs
+       if set := ctx.Value(counterSetKey); set != nil {
+               return set.(*ptCounterSet)
        }
-       panic("counterSet missing, beam isn't set up properly.")
-       return nil // never runs.
+       // This isn't a beam context, so we can't store the metric.
 
 Review comment:
   The counterset is what's used to store the metric for bundle access, so it's 
accurate, but you're right that it's confusing. Rewording.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 380228)

> Reduce overhead of Go SDK side metrics
> --------------------------------------
>
>                 Key: BEAM-9167
>                 URL: https://issues.apache.org/jira/browse/BEAM-9167
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Robert Burke
>            Priority: Major
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> Locking overhead due to the global store and local caches of SDK counter data 
> can dominate certain workloads, which means we can do better.
> Instead of having a global store of metrics data to extract counters, we 
> should use per ptransform (or per bundle) counter sets, which would avoid 
> requiring locking per counter operation. The main detriment compared to the 
> current implementation is that a user would need to add their own locking if 
> they were to spawn multiple goroutines to process a Bundle's work in a DoFn.
> Given that self multithreaded DoFns aren't recommended/safe in Java,  largely 
> impossible in Python, and the other beam Go SDK provided constructs (like 
> Iterators and Emitters) are not thread safe, this is a small concern, 
> provided the documentation is clear on this.
> Removing the locking and switching to atomic ops reduces the overhead 
> significantly in example jobs and in the benchmarks.
> A second part of this change should be to move the exec package to manage 
> it's own per bundle state, rather than relying on a global datastore to 
> extract the per bundle,per ptransform values.
> Related: https://issues.apache.org/jira/browse/BEAM-6541 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to