[ 
https://issues.apache.org/jira/browse/BEAM-3545?focusedWorklogId=83699&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83699
 ]

ASF GitHub Bot logged work on BEAM-3545:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Mar/18 17:27
            Start Date: 23/Mar/18 17:27
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on a change in pull request #4899: 
[BEAM-3545] Go SDK UserCounters
URL: https://github.com/apache/beam/pull/4899#discussion_r176808627
 
 

 ##########
 File path: sdks/go/pkg/beam/metrics.go
 ##########
 @@ -0,0 +1,373 @@
+package beam
+
+import (
+       "context"
+       "fmt"
+       "sync"
+       "time"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/ptransform"
+       "github.com/apache/beam/sdks/go/pkg/beam/log"
+       "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+       "github.com/golang/protobuf/ptypes"
+)
+
+type userMetricker interface {
+       toProto() *fnexecution_v1.Metrics_User
+}
+
+type metricName struct {
+       namespace, name string
+}
+
+func (n *metricName) String() string {
+       return fmt.Sprintf("%s.%s", n.namespace, n.name)
+}
+
+func validateName(mn metricName, t metricType) {
+       if len(mn.name) == 0 || len(mn.namespace) == 0 {
+               panic(fmt.Sprintf("namespace and name are required to be 
non-empty, got %q and %q", mn.namespace, mn.name))
+       }
+}
+
+var (
+       metricMu      sync.Mutex
+       metricStorage = make(map[string]map[metricName]userMetricker)
+       counters      = make(map[metricName]*Counter)
+       distributions = make(map[metricName]*Distribution)
+       gauges        = make(map[metricName]*Gauge)
+)
+
+type metricType uint8
+
+const (
+       counterType metricType = iota
+       distributionType
+       gaugeType
+)
+
+func (t metricType) String() string {
+       switch t {
+       case counterType:
+               return "Counter"
+       case distributionType:
+               return "Distribution"
+       case gaugeType:
+               return "Gauge"
+       default:
+               panic(fmt.Sprintf("Unknown metric type value: %v", int(t)))
+       }
+}
+
+// Here's the flow for Metrics:
+// User declares/gets a metric in their DoFn
+//  * Can be done per invocation, or
+//  * Can be done in a Setup for Structural DoFns
+// The metric holds onto all contexts/PTransforms that the metric is being 
used for
+// The user does whatever increments and updates they desire in their DoFn to 
the metrics
+// * Counters and Distributions both use a distribution as a backing storage
+// * Gauges use something different for storing the value and the time
+// This backing store is also referenced on a per context/Ptransform basis so 
they can be
+// extracted as protos during ProgressUpdates
+//
+// Metrics are created on demand by user code and are initialized with their 
first
+// value. This is handy for distributions which never need to worry about the
+// initial min/max values. They're never sent if they're never created.
+//
+// Behaviour is undefined if two counters with the same names are used
+// with different types of metric are used with different types.
+// Since metrics are only created when they're being used the first time
+// with a context, we can't check for duplicates within a PTransform until both
+// are used.
+//
+// Metrics within a given PTransform.Namespace.Name must be unique, and within 
that context,
+// may not change types.
+
+type Clocker interface {
+       Now() time.Time
+}
+
+type defaultclock struct{}
+
+func (defaultclock) Now() time.Time {
+       return time.Now()
+}
+
+// TODO(lostluck): 2018/03/05 Use a common internal beam.Clock instead, once 
that exists.
+var clock Clocker = defaultclock{}
+
+// Counter is a simple counter for incrementing and decrementing a value.
+type Counter struct {
+       name    metricName
+       storage map[string]*distribution
+       mu      sync.Mutex
+}
+
+func (m *Counter) String() string {
+       return fmt.Sprintf("%s metric %s", counterType, &m.name)
+}
+
+// GetCounter returns the Counter with the given namespace and name.
+func GetCounter(namespace, name string) *Counter {
+       mn := metricName{namespace: namespace, name: name}
+       metricMu.Lock()
+       defer metricMu.Unlock()
+       if m, ok := counters[mn]; ok {
+               return m
+       }
+       validateName(mn, counterType)
+       m := &Counter{
+               name:    mn,
+               storage: make(map[string]*distribution),
+       }
+       counters[mn] = m
 
 Review comment:
   Done, this seems like a reasonable usecase for sync.Map based on the 
documentation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 83699)
    Time Spent: 2h  (was: 1h 50m)

> Fn API metrics in Go SDK harness
> --------------------------------
>
>                 Key: BEAM-3545
>                 URL: https://issues.apache.org/jira/browse/BEAM-3545
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Kenneth Knowles
>            Assignee: Robert Burke
>            Priority: Major
>              Labels: portability
>          Time Spent: 2h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to