[
https://issues.apache.org/jira/browse/BEAM-3545?focusedWorklogId=83699&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83699
]
ASF GitHub Bot logged work on BEAM-3545:
----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Mar/18 17:27
Start Date: 23/Mar/18 17:27
Worklog Time Spent: 10m
Work Description: lostluck commented on a change in pull request #4899:
[BEAM-3545] Go SDK UserCounters
URL: https://github.com/apache/beam/pull/4899#discussion_r176808627
##########
File path: sdks/go/pkg/beam/metrics.go
##########
@@ -0,0 +1,373 @@
+package beam
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/util/ptransform"
+ "github.com/apache/beam/sdks/go/pkg/beam/log"
+ "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+ "github.com/golang/protobuf/ptypes"
+)
+
+type userMetricker interface {
+ toProto() *fnexecution_v1.Metrics_User
+}
+
+type metricName struct {
+ namespace, name string
+}
+
+func (n *metricName) String() string {
+ return fmt.Sprintf("%s.%s", n.namespace, n.name)
+}
+
+func validateName(mn metricName, t metricType) {
+ if len(mn.name) == 0 || len(mn.namespace) == 0 {
+ panic(fmt.Sprintf("namespace and name are required to be
non-empty, got %q and %q", mn.namespace, mn.name))
+ }
+}
+
+var (
+ metricMu sync.Mutex
+ metricStorage = make(map[string]map[metricName]userMetricker)
+ counters = make(map[metricName]*Counter)
+ distributions = make(map[metricName]*Distribution)
+ gauges = make(map[metricName]*Gauge)
+)
+
+type metricType uint8
+
+const (
+ counterType metricType = iota
+ distributionType
+ gaugeType
+)
+
+func (t metricType) String() string {
+ switch t {
+ case counterType:
+ return "Counter"
+ case distributionType:
+ return "Distribution"
+ case gaugeType:
+ return "Gauge"
+ default:
+ panic(fmt.Sprintf("Unknown metric type value: %v", int(t)))
+ }
+}
+
+// Here's the flow for Metrics:
+// User declares/gets a metric in their DoFn
+// * Can be done per invocation, or
+// * Can be done in a Setup for Structural DoFns
+// The metric holds onto all contexts/PTransforms that the metric is being
used for
+// The user does whatever increments and updates they desire in their DoFn to
the metrics
+// * Counters and Distributions both use a distribution as a backing storage
+// * Gauges use something different for storing the value and the time
+// This backing store is also referenced on a per context/Ptransform basis so
they can be
+// extracted as protos during ProgressUpdates
+//
+// Metrics are created on demand by user code and are initialized with their
first
+// value. This is handy for distributions which never need to worry about the
+// initial min/max values. They're never sent if they're never created.
+//
+// Behaviour is undefined if two counters with the same names are used
+// with different types of metric are used with different types.
+// Since metrics are only created when they're being used the first time
+// with a context, we can't check for duplicates within a PTransform until both
+// are used.
+//
+// Metrics within a given PTransform.Namespace.Name must be unique, and within
that context,
+// may not change types.
+
+type Clocker interface {
+ Now() time.Time
+}
+
+type defaultclock struct{}
+
+func (defaultclock) Now() time.Time {
+ return time.Now()
+}
+
+// TODO(lostluck): 2018/03/05 Use a common internal beam.Clock instead, once
that exists.
+var clock Clocker = defaultclock{}
+
+// Counter is a simple counter for incrementing and decrementing a value.
+type Counter struct {
+ name metricName
+ storage map[string]*distribution
+ mu sync.Mutex
+}
+
+func (m *Counter) String() string {
+ return fmt.Sprintf("%s metric %s", counterType, &m.name)
+}
+
+// GetCounter returns the Counter with the given namespace and name.
+func GetCounter(namespace, name string) *Counter {
+ mn := metricName{namespace: namespace, name: name}
+ metricMu.Lock()
+ defer metricMu.Unlock()
+ if m, ok := counters[mn]; ok {
+ return m
+ }
+ validateName(mn, counterType)
+ m := &Counter{
+ name: mn,
+ storage: make(map[string]*distribution),
+ }
+ counters[mn] = m
Review comment:
Done, this seems like a reasonable usecase for sync.Map based on the
documentation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 83699)
Time Spent: 2h (was: 1h 50m)
> Fn API metrics in Go SDK harness
> --------------------------------
>
> Key: BEAM-3545
> URL: https://issues.apache.org/jira/browse/BEAM-3545
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Kenneth Knowles
> Assignee: Robert Burke
> Priority: Major
> Labels: portability
> Time Spent: 2h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)