[
https://issues.apache.org/jira/browse/BEAM-3545?focusedWorklogId=85089&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85089
]
ASF GitHub Bot logged work on BEAM-3545:
----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Mar/18 00:31
Start Date: 28/Mar/18 00:31
Worklog Time Spent: 10m
Work Description: tgroh closed pull request #4899: [BEAM-3545] Go SDK
UserCounters
URL: https://github.com/apache/beam/pull/4899
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/go/examples/wordcount/wordcount.go
b/sdks/go/examples/wordcount/wordcount.go
index cd513a48cff..f28d1254d42 100644
--- a/sdks/go/examples/wordcount/wordcount.go
+++ b/sdks/go/examples/wordcount/wordcount.go
@@ -61,6 +61,7 @@ import (
"fmt"
"log"
"regexp"
+ "strings"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/textio"
@@ -101,10 +102,18 @@ var (
// returns a PCollection of type string. Also, using named function transforms
allows
// for easy reuse, modular testing, and an improved monitoring experience.
-var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+var (
+ wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+ empty = beam.NewCounter("extract", "emptyLines")
+ lineLen = beam.NewDistribution("extract", "lineLenDistro")
+)
// extractFn is a DoFn that emits the words in a given line.
-func extractFn(line string, emit func(string)) {
+func extractFn(ctx context.Context, line string, emit func(string)) {
+ lineLen.Update(ctx, int64(len(line)))
+ if len(strings.TrimSpace(line)) == 0 {
+ empty.Inc(ctx, 1)
+ }
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
diff --git a/sdks/go/pkg/beam/core/metrics/metrics.go
b/sdks/go/pkg/beam/core/metrics/metrics.go
new file mode 100644
index 00000000000..73ef73a36b6
--- /dev/null
+++ b/sdks/go/pkg/beam/core/metrics/metrics.go
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package metrics implements the Beam metrics API, described at
+// http://s.apache.org/beam-metrics-api
+//
+// Metrics in the Beam model are uniquely identified by a namespace, a name,
+// and the PTransform context in which they are used. Further, they are
+// reported as a delta against the bundle being processed, so that overcounting
+// doesn't occur if a bundle needs to be retried. Each metric is scoped to
+// their bundle, and ptransform.
+//
+// Cells (or metric cells) are defined for each Beam model metric
+// type, and the serve as concurrency safe storage of a given metric's values.
+// Proxys are exported values representing the metric, for use in user
+// ptransform code. They don't retain their cells, since they don't have
+// the context to be able to store them for export back to the pipeline runner.
+//
+// Metric cells aren't initialized until their first mutation, which
+// follows from the Beam model design, where metrics are only sent for a bundle
+// if they have changed. This is particularly convenient for distributions
which
+// means their min and max fields can be set to the first value on creation
+// rather than have some marker of uninitialized state, which would otherwise
+// need to be checked for on every update.
+//
+// Metric values are implemented as lightweight proxies of the user provided
+// namespace and name. This allows them to be declared globally, and used in
+// any ParDo. Further, as per the design, they can be declared dynamically
+// at runtime.
+//
+// To handle reporting deltas on the metrics by bundle, metrics
+// are keyed by bundleID,PTransformID,namespace, and name, so metrics that
+// are identical except for bundles are treated as distinct, effectively
+// providing per bundle deltas, since a new value cell is used per bundle.
+package metrics
+
+import (
+ "context"
+ "fmt"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/log"
+ "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+ "github.com/golang/protobuf/ptypes"
+)
+
+// Metric cells are named and scoped by ptransform, and bundle,
+// the latter two of which are only known at runtime. We propagate
+// the PTransformID and BundleID via a context.Context. Consequently
+// using metrics requires the PTransform have a context.Context
+// argument.
+
+type ctxKey string
+
+const bundleKey ctxKey = "beam:bundle"
+const ptransformKey ctxKey = "beam:ptransform"
+
+// SetBundleID sets the id of the current Bundle.
+func SetBundleID(ctx context.Context, id string) context.Context {
+ return context.WithValue(ctx, bundleKey, id)
+}
+
+// SetPTransformID sets the id of the current PTransform.
+func SetPTransformID(ctx context.Context, id string) context.Context {
+ return context.WithValue(ctx, ptransformKey, id)
+}
+
+func getContextKey(ctx context.Context, n name) key {
+ key := key{name: n, bundle: "(bundle id unset)", ptransform:
"(ptransform id unset)"}
+ if id := ctx.Value(bundleKey); id != nil {
+ key.bundle = id.(string)
+ }
+ if id := ctx.Value(ptransformKey); id != nil {
+ key.ptransform = id.(string)
+ }
+ return key
+}
+
+// userMetric knows how to convert it's value to a Metrics_User proto.
+type userMetric interface {
+ toProto() *fnexecution_v1.Metrics_User
+}
+
+// name is a pair of strings identifying a specific metric.
+type name struct {
+ namespace, name string
+}
+
+func (n name) String() string {
+ return fmt.Sprintf("%s.%s", n.namespace, n.name)
+}
+
+func newName(ns, n string) name {
+ if len(n) == 0 || len(ns) == 0 {
+ panic(fmt.Sprintf("namespace and name are required to be
non-empty, got %q and %q", ns, n))
+ }
+ return name{namespace: ns, name: n}
+}
+
+type key struct {
+ name name
+ bundle, ptransform string
+}
+
+var (
+ // mu protects access to store
+ mu sync.RWMutex
+ // store is a map of BundleIDs to PtransformIDs to userMetrics.
+ // it permits us to extract metric protos for runners per data Bundle,
and
+ // per PTransform.
+ store = make(map[string]map[string]map[name]userMetric)
+
+ // We store the user path access to the cells in metric type segregated
+ // sync.Maps. Using sync.Maps lets metrics with disjoint keys have
concurrent
+ // access to the cells, and using separate sync.Map per metric type
+ // simplifies code understanding, since each only contains a single
type of
+ // cell.
+
+ // counters is a map[key]*counter
+ counters = sync.Map{}
+ // distributions is a map[key]*distribution
+ distributions = sync.Map{}
+ // gauges is a map[key]*gauge
+ gauges = sync.Map{}
+)
+
+// TODO(lostluck): 2018/03/05 Use a common internal beam now() instead, once
that exists.
+var now = time.Now
+
+// Counter is a simple counter for incrementing and decrementing a value.
+type Counter struct {
+ name name
+}
+
+func (m Counter) String() string {
+ return fmt.Sprintf("Counter metric %s", m.name)
+}
+
+// NewCounter returns the Counter with the given namespace and name.
+func NewCounter(ns, n string) Counter {
+ mn := newName(ns, n)
+ return Counter{
+ name: mn,
+ }
+}
+
+// Inc increments the counter within the given PTransform context by v.
+func (m Counter) Inc(ctx context.Context, v int64) {
+ key := getContextKey(ctx, m.name)
+ cs := &counter{
+ value: v,
+ }
+ if m, loaded := counters.LoadOrStore(key, cs); loaded {
+ c := m.(*counter)
+ c.inc(v)
+ } else {
+ c := m.(*counter)
+ storeMetric(key, c)
+ }
+}
+
+// Dec decrements the counter within the given PTransform context by v.
+func (m Counter) Dec(ctx context.Context, v int64) {
+ m.Inc(ctx, -v)
+}
+
+// counter is a metric cell for counter values.
+type counter struct {
+ value int64
+ mu sync.Mutex
+}
+
+func (m *counter) inc(v int64) {
+ m.mu.Lock()
+ m.value += v
+ m.mu.Unlock()
+}
+
+func (m *counter) String() string {
+ return fmt.Sprintf("value: %d", m.value)
+}
+
+// toProto returns a Metrics_User populated with the Data messages, but not
the name. The
+// caller needs to populate with the metric's name.
+func (m *counter) toProto() *fnexecution_v1.Metrics_User {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return &fnexecution_v1.Metrics_User{
+ Data: &fnexecution_v1.Metrics_User_CounterData_{
+ CounterData: &fnexecution_v1.Metrics_User_CounterData{
+ Value: m.value,
+ },
+ },
+ }
+}
+
+// Distribution is a simple distribution of values.
+type Distribution struct {
+ name name
+}
+
+func (m Distribution) String() string {
+ return fmt.Sprintf("Distribution metric %s", m.name)
+}
+
+// NewDistribution returns the Distribution with the given namespace and name.
+func NewDistribution(ns, n string) Distribution {
+ mn := newName(ns, n)
+ return Distribution{
+ name: mn,
+ }
+}
+
+// Update updates the distribution within the given PTransform context with v.
+func (m Distribution) Update(ctx context.Context, v int64) {
+ key := getContextKey(ctx, m.name)
+ ds := &distribution{
+ count: 1,
+ sum: v,
+ min: v,
+ max: v,
+ }
+ if m, loaded := distributions.LoadOrStore(key, ds); loaded {
+ d := m.(*distribution)
+ d.update(v)
+ } else {
+ d := m.(*distribution)
+ storeMetric(key, d)
+ }
+}
+
+// distribution is a metric cell for distribution values.
+type distribution struct {
+ count, sum, min, max int64
+ mu sync.Mutex
+}
+
+func (m *distribution) update(v int64) {
+ m.mu.Lock()
+ if v < m.min {
+ m.min = v
+ }
+ if v > m.max {
+ m.max = v
+ }
+ m.count++
+ m.sum += v
+ m.mu.Unlock()
+}
+
+func (m *distribution) String() string {
+ return fmt.Sprintf("count: %d sum: %d min: %d max: %d", m.count, m.sum,
m.min, m.max)
+}
+
+// toProto returns a Metrics_User populated with the Data messages, but not
the name. The
+// caller needs to populate with the metric's name.
+func (m *distribution) toProto() *fnexecution_v1.Metrics_User {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return &fnexecution_v1.Metrics_User{
+ Data: &fnexecution_v1.Metrics_User_DistributionData_{
+ DistributionData:
&fnexecution_v1.Metrics_User_DistributionData{
+ Count: m.count,
+ Sum: m.sum,
+ Min: m.min,
+ Max: m.max,
+ },
+ },
+ }
+}
+
+// Gauge is a time, value pair metric.
+type Gauge struct {
+ name name
+}
+
+func (m Gauge) String() string {
+ return fmt.Sprintf("Guage metric %s", m.name)
+}
+
+// NewGauge returns the Gauge with the given namespace and name.
+func NewGauge(ns, n string) Gauge {
+ mn := newName(ns, n)
+ return Gauge{
+ name: mn,
+ }
+}
+
+// Set sets the gauge to the given value, and associates it with the current
time on the clock.
+func (m Gauge) Set(ctx context.Context, v int64) {
+ key := getContextKey(ctx, m.name)
+ gs := &gauge{
+ t: now(),
+ v: v,
+ }
+ if m, loaded := gauges.LoadOrStore(key, gs); loaded {
+ g := m.(*gauge)
+ g.set(v)
+ } else {
+ g := m.(*gauge)
+ storeMetric(key, g)
+ }
+}
+
+// storeMetric stores a metric away on its first use so it may be retrieved
later on.
+func storeMetric(key key, m userMetric) {
+ mu.Lock()
+ defer mu.Unlock()
+ if _, ok := store[key.bundle]; !ok {
+ store[key.bundle] = make(map[string]map[name]userMetric)
+ }
+ if _, ok := store[key.bundle][key.ptransform]; !ok {
+ store[key.bundle][key.ptransform] = make(map[name]userMetric)
+ }
+ if _, ok := store[key.bundle][key.ptransform][key.name]; ok {
+ panic(fmt.Sprintf("metric name %s being reused for a second
metric in a single PTransform", key.name))
+ }
+ store[key.bundle][key.ptransform][key.name] = m
+}
+
+// gauge is a metric cell for gauge values.
+type gauge struct {
+ mu sync.Mutex
+ t time.Time
+ v int64
+}
+
+func (m *gauge) set(v int64) {
+ m.mu.Lock()
+ m.t = now()
+ m.v = v
+ m.mu.Unlock()
+}
+
+func (m *gauge) toProto() *fnexecution_v1.Metrics_User {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ ts, err := ptypes.TimestampProto(m.t)
+ if err != nil {
+ panic(err)
+ }
+ return &fnexecution_v1.Metrics_User{
+ Data: &fnexecution_v1.Metrics_User_GaugeData_{
+ GaugeData: &fnexecution_v1.Metrics_User_GaugeData{
+ Value: m.v,
+ Timestamp: ts,
+ },
+ },
+ }
+}
+
+func (m *gauge) String() string {
+ return fmt.Sprintf("time: %s value: %d", m.t, m.v)
+}
+
+// ToProto exports all collected metrics for the given BundleID and PTransform
ID pair.
+func ToProto(b, pt string) []*fnexecution_v1.Metrics_User {
+ mu.RLock()
+ defer mu.RUnlock()
+ ps := store[b]
+ s := ps[pt]
+ var ret []*fnexecution_v1.Metrics_User
+ for n, m := range s {
+ p := m.toProto()
+ p.MetricName = &fnexecution_v1.Metrics_User_MetricName{
+ Name: n.name,
+ Namespace: n.namespace,
+ }
+ ret = append(ret, p)
+ }
+ return ret
+}
+
+// DumpToLog is a debugging function that outputs all metrics available
locally to beam.Log.
+func DumpToLog(ctx context.Context) {
+ dumpTo(func(format string, args ...interface{}) {
+ log.Errorf(ctx, format, args...)
+ })
+}
+
+// DumpToOut is a debugging function that outputs all metrics available
locally to std out.
+func DumpToOut() {
+ dumpTo(func(format string, args ...interface{}) {
+ fmt.Printf(format+"\n", args...)
+ })
+}
+
+func dumpTo(p func(format string, args ...interface{})) {
+ mu.RLock()
+ defer mu.RUnlock()
+ var bs []string
+ for b := range store {
+ bs = append(bs, b)
+ }
+ sort.Strings(bs)
+ for _, b := range bs {
+ var pts []string
+ for pt := range store[b] {
+ pts = append(pts, pt)
+ }
+ sort.Strings(pts)
+ for _, pt := range pts {
+ var ns []name
+ for n := range store[b][pt] {
+ ns = append(ns, n)
+ }
+ sort.Slice(ns, func(i, j int) bool {
+ if ns[i].namespace < ns[j].namespace {
+ return true
+ }
+ if ns[i].namespace == ns[j].namespace &&
ns[i].name < ns[j].name {
+ return true
+ }
+ return false
+ })
+ p("Bundle: %q - PTransformID: %q", b, pt)
+ for _, n := range ns {
+ key := key{name: n, bundle: b, ptransform: pt}
+ if m, ok := counters.Load(key); ok {
+ p("\t%s - %s", key.name, m)
+ }
+ if m, ok := distributions.Load(key); ok {
+ p("\t%s - %s", key.name, m)
+ }
+ if m, ok := gauges.Load(key); ok {
+ p("\t%s - %s", key.name, m)
+ }
+ }
+ }
+ }
+}
+
+// Clear resets all storage associated with metrics for tests.
+// Calling this in pipeline code leads to inaccurate metrics.
+func Clear() {
+ mu.Lock()
+ store = make(map[string]map[string]map[name]userMetric)
+ counters = sync.Map{}
+ distributions = sync.Map{}
+ gauges = sync.Map{}
+ mu.Unlock()
+}
+
+// ClearBundleData removes stored references associated with a given bundle,
+// so it can be garbage collected.
+func ClearBundleData(b string) {
+ // No concurrency races since mu guards all access to store,
+ // and the metric cell sync.Maps are goroutine safe.
+ mu.Lock()
+ pts := store[b]
+ for pt, m := range pts {
+ for n := range m {
+ key := key{name: n, bundle: b, ptransform: pt}
+ counters.Delete(key)
+ distributions.Delete(key)
+ gauges.Delete(key)
+ }
+ }
+ delete(store, b)
+ mu.Unlock()
+}
diff --git a/sdks/go/pkg/beam/core/metrics/metrics_test.go
b/sdks/go/pkg/beam/core/metrics/metrics_test.go
new file mode 100644
index 00000000000..80724df2580
--- /dev/null
+++ b/sdks/go/pkg/beam/core/metrics/metrics_test.go
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+)
+
+// bID is a bundleId to use in the tests, if nothing more specific is needed.
+const bID = "bID"
+
+func ctxWith(b, pt string) context.Context {
+ ctx := context.Background()
+ ctx = SetPTransformID(ctx, pt)
+ ctx = SetBundleID(ctx, b)
+ return ctx
+}
+
+func TestCounter_Inc(t *testing.T) {
+ tests := []struct {
+ ns, n, key string // Counter name and PTransform context
+ inc int64
+ value int64 // Internal variable to check
+ }{
+ {ns: "inc1", n: "count", key: "A", inc: 1, value: 1},
+ {ns: "inc1", n: "count", key: "A", inc: 1, value: 2},
+ {ns: "inc1", n: "ticker", key: "A", inc: 1, value: 1},
+ {ns: "inc1", n: "ticker", key: "A", inc: 2, value: 3},
+ {ns: "inc1", n: "count", key: "B", inc: 1, value: 1},
+ {ns: "inc1", n: "count", key: "B", inc: 1, value: 2},
+ {ns: "inc1", n: "ticker", key: "B", inc: 1, value: 1},
+ {ns: "inc1", n: "ticker", key: "B", inc: 2, value: 3},
+ {ns: "inc2", n: "count", key: "A", inc: 1, value: 1},
+ {ns: "inc2", n: "count", key: "A", inc: 1, value: 2},
+ {ns: "inc2", n: "ticker", key: "A", inc: 1, value: 1},
+ {ns: "inc2", n: "ticker", key: "A", inc: 2, value: 3},
+ {ns: "inc2", n: "count", key: "B", inc: 1, value: 1},
+ {ns: "inc2", n: "count", key: "B", inc: 1, value: 2},
+ {ns: "inc2", n: "ticker", key: "B", inc: 1, value: 1},
+ {ns: "inc2", n: "ticker", key: "B", inc: 2, value: 3},
+ }
+
+ for _, test := range tests {
+ t.Run(fmt.Sprintf("add %d to %s.%s[%q] value: %d", test.inc,
test.ns, test.n, test.key, test.value),
+ func(t *testing.T) {
+ m := NewCounter(test.ns, test.n)
+ ctx := ctxWith(bID, test.key)
+ m.Inc(ctx, test.inc)
+
+ key := key{name: name{namespace: test.ns, name:
test.n}, bundle: bID, ptransform: test.key}
+ q, ok := counters.Load(key)
+ if !ok {
+ t.Fatalf("Unable to find Counter for
key %v", key)
+ }
+ c := q.(*counter)
+ if got, want := c.value, test.value; got !=
want {
+ t.Errorf("GetCounter(%q,%q).Inc(%s, %d)
c.value got %v, want %v", test.ns, test.n, test.key, test.inc, got, want)
+ }
+ })
+ }
+}
+
+func TestCounter_Dec(t *testing.T) {
+ tests := []struct {
+ ns, n, key string // Counter name and PTransform context
+ dec int64
+ value int64 // Internal variable to check
+ }{
+ {ns: "dec1", n: "count", key: "A", dec: 1, value: -1},
+ {ns: "dec1", n: "count", key: "A", dec: 1, value: -2},
+ {ns: "dec1", n: "ticker", key: "A", dec: 1, value: -1},
+ {ns: "dec1", n: "ticker", key: "A", dec: 2, value: -3},
+ {ns: "dec1", n: "count", key: "B", dec: 1, value: -1},
+ {ns: "dec1", n: "count", key: "B", dec: 1, value: -2},
+ {ns: "dec1", n: "ticker", key: "B", dec: 1, value: -1},
+ {ns: "dec1", n: "ticker", key: "B", dec: 2, value: -3},
+ {ns: "dec2", n: "count", key: "A", dec: 1, value: -1},
+ {ns: "dec2", n: "count", key: "A", dec: 1, value: -2},
+ {ns: "dec2", n: "ticker", key: "A", dec: 1, value: -1},
+ {ns: "dec2", n: "ticker", key: "A", dec: 2, value: -3},
+ {ns: "dec2", n: "count", key: "B", dec: 1, value: -1},
+ {ns: "dec2", n: "count", key: "B", dec: 1, value: -2},
+ {ns: "dec2", n: "ticker", key: "B", dec: 1, value: -1},
+ {ns: "dec2", n: "ticker", key: "B", dec: 2, value: -3},
+ }
+
+ for _, test := range tests {
+ t.Run(fmt.Sprintf("subtract %d to %s.%s[%q] value: %d",
test.dec, test.ns, test.n, test.key, test.value),
+ func(t *testing.T) {
+ m := NewCounter(test.ns, test.n)
+ ctx := ctxWith(bID, test.key)
+ m.Dec(ctx, test.dec)
+
+ key := key{name: name{namespace: test.ns, name:
test.n}, bundle: bID, ptransform: test.key}
+ q, ok := counters.Load(key)
+ if !ok {
+ t.Fatalf("Unable to find Counter for
key %v", key)
+ }
+ c := q.(*counter)
+ if got, want := c.value, test.value; got !=
want {
+ t.Errorf("GetCounter(%q,%q).Dec(%s, %d)
c.value got %v, want %v", test.ns, test.n, test.key, test.dec, got, want)
+ }
+ })
+ }
+}
+
+func TestDistribution_Update(t *testing.T) {
+ tests := []struct {
+ ns, n, key string // Gauge name and PTransform context
+ v int64
+ count, sum, min, max int64 // Internal variables to check
+ }{
+ {ns: "update1", n: "latency", key: "A", v: 1, count: 1, sum: 1,
min: 1, max: 1},
+ {ns: "update1", n: "latency", key: "A", v: 1, count: 2, sum: 2,
min: 1, max: 1},
+ {ns: "update1", n: "latency", key: "A", v: 1, count: 3, sum: 3,
min: 1, max: 1},
+ {ns: "update1", n: "size", key: "A", v: 1, count: 1, sum: 1,
min: 1, max: 1},
+ {ns: "update1", n: "size", key: "A", v: 2, count: 2, sum: 3,
min: 1, max: 2},
+ {ns: "update1", n: "size", key: "A", v: 3, count: 3, sum: 6,
min: 1, max: 3},
+ {ns: "update1", n: "size", key: "A", v: -4, count: 4, sum: 2,
min: -4, max: 3},
+ {ns: "update1", n: "size", key: "A", v: 1, count: 5, sum: 3,
min: -4, max: 3},
+ {ns: "update1", n: "latency", key: "B", v: 1, count: 1, sum: 1,
min: 1, max: 1},
+ {ns: "update1", n: "latency", key: "B", v: 1, count: 2, sum: 2,
min: 1, max: 1},
+ {ns: "update1", n: "size", key: "B", v: 1, count: 1, sum: 1,
min: 1, max: 1},
+ {ns: "update1", n: "size", key: "B", v: 2, count: 2, sum: 3,
min: 1, max: 2},
+ {ns: "update2", n: "latency", key: "A", v: 1, count: 1, sum: 1,
min: 1, max: 1},
+ {ns: "update2", n: "latency", key: "A", v: 1, count: 2, sum: 2,
min: 1, max: 1},
+ {ns: "update2", n: "size", key: "A", v: 1, count: 1, sum: 1,
min: 1, max: 1},
+ {ns: "update2", n: "size", key: "A", v: 2, count: 2, sum: 3,
min: 1, max: 2},
+ {ns: "update2", n: "latency", key: "B", v: 1, count: 1, sum: 1,
min: 1, max: 1},
+ {ns: "update2", n: "latency", key: "B", v: 1, count: 2, sum: 2,
min: 1, max: 1},
+ {ns: "update2", n: "size", key: "B", v: 1, count: 1, sum: 1,
min: 1, max: 1},
+ {ns: "update2", n: "size", key: "B", v: 2, count: 2, sum: 3,
min: 1, max: 2},
+ {ns: "update1", n: "size", key: "A", v: 1, count: 6, sum: 4,
min: -4, max: 3},
+ }
+
+ for _, test := range tests {
+ t.Run(fmt.Sprintf("add %d to %s.%s[%q] count: %d sum: %d",
test.v, test.ns, test.n, test.key, test.count, test.sum),
+ func(t *testing.T) {
+ m := NewDistribution(test.ns, test.n)
+ ctx := ctxWith(bID, test.key)
+ m.Update(ctx, test.v)
+
+ key := key{name: name{namespace: test.ns, name:
test.n}, bundle: bID, ptransform: test.key}
+ q, ok := distributions.Load(key)
+ if !ok {
+ t.Fatalf("Unable to find Distribution
for key %v", key)
+ }
+ d := q.(*distribution)
+ if got, want := d.count, test.count; got !=
want {
+
t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.count got %v, want %v",
test.ns, test.n, test.key, test.v, got, want)
+ }
+ if got, want := d.sum, test.sum; got != want {
+
t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.sum got %v, want %v",
test.ns, test.n, test.key, test.v, got, want)
+ }
+ if got, want := d.min, test.min; got != want {
+
t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.min got %v, want %v",
test.ns, test.n, test.key, test.v, got, want)
+ }
+ if got, want := d.max, test.max; got != want {
+
t.Errorf("GetDistribution(%q,%q).Update(%s, %d) d.max got %v, want %v",
test.ns, test.n, test.key, test.v, got, want)
+ }
+ })
+ }
+}
+
+func testclock(t time.Time) func() time.Time {
+ return func() time.Time { return t }
+}
+
+func TestGauge_Set(t *testing.T) {
+ tests := []struct {
+ ns, n, key string // Gauge name and PTransform context
+ v int64
+ t time.Time
+ }{
+ {ns: "set1", n: "load", key: "A", v: 1, t: time.Unix(0, 0)},
+ {ns: "set1", n: "load", key: "A", v: 1, t: time.Unix(0, 0)},
+ {ns: "set1", n: "speed", key: "A", v: 1, t: time.Unix(0, 0)},
+ {ns: "set1", n: "speed", key: "A", v: 2, t: time.Unix(0, 0)},
+ {ns: "set1", n: "load", key: "B", v: 1, t: time.Unix(0, 0)},
+ {ns: "set1", n: "load", key: "B", v: 1, t: time.Unix(0, 0)},
+ {ns: "set1", n: "speed", key: "B", v: 1, t: time.Unix(0, 0)},
+ {ns: "set1", n: "speed", key: "B", v: 2, t: time.Unix(0, 0)},
+ {ns: "set2", n: "load", key: "A", v: 1, t: time.Unix(0, 0)},
+ {ns: "set2", n: "load", key: "A", v: 1, t: time.Unix(0, 0)},
+ {ns: "set2", n: "speed", key: "A", v: 1, t: time.Unix(0, 0)},
+ {ns: "set2", n: "speed", key: "A", v: 2, t: time.Unix(0, 0)},
+ {ns: "set2", n: "load", key: "B", v: 1, t: time.Unix(0, 0)},
+ {ns: "set2", n: "load", key: "B", v: 1, t: time.Unix(0, 0)},
+ {ns: "set2", n: "speed", key: "B", v: 1, t: time.Unix(0, 0)},
+ {ns: "set2", n: "speed", key: "B", v: 2, t: time.Unix(0, 0)},
+ }
+
+ for _, test := range tests {
+ t.Run(fmt.Sprintf("set %s.%s[%q] to %d at %v", test.ns, test.n,
test.key, test.v, test.t),
+ func(t *testing.T) {
+ m := NewGauge(test.ns, test.n)
+ ctx := ctxWith(bID, test.key)
+ now = testclock(test.t)
+ m.Set(ctx, test.v)
+
+ key := key{name: name{namespace: test.ns, name:
test.n}, bundle: bID, ptransform: test.key}
+ q, ok := gauges.Load(key)
+ if !ok {
+ t.Fatalf("Unable to find Gauge for key
%v", key)
+ }
+ g := q.(*gauge)
+ if got, want := g.v, test.v; got != want {
+ t.Errorf("GetGauge(%q,%q).Set(%s, %d)
g.v got %v, want %v", test.ns, test.n, test.key, test.v, got, want)
+ }
+ if got, want := g.t, test.t; got != want {
+ t.Errorf("GetGauge(%q,%q).Set(%s, %d)
t.t got %v, want %v", test.ns, test.n, test.key, test.v, got, want)
+ }
+ })
+ }
+}
+
+type metricType uint8
+
+const (
+ counterType metricType = iota
+ distributionType
+ gaugeType
+)
+
+func (t metricType) String() string {
+ switch t {
+ case counterType:
+ return "Counter"
+ case distributionType:
+ return "Distribution"
+ case gaugeType:
+ return "Gauge"
+ default:
+ panic(fmt.Sprintf("Unknown metric type value: %v", uint8(t)))
+ }
+}
+func TestNameCollisions(t *testing.T) {
+ ns, c, d, g := "collisions", "counter", "distribution", "gauge"
+ // Checks that user code panics if a counter attempts to be defined in
the same PTransform
+ // Collisions are unfortunately only detectable at runtime, and only if
both the initial
+ // metric, and the new metric are actually used, since we don't know
the context until
+ // then.
+ // Pre-create and use so that we have existing metrics to collide with.
+ NewCounter(ns, c).Inc(ctxWith(bID, c), 1)
+ NewDistribution(ns, d).Update(ctxWith(bID, d), 1)
+ NewGauge(ns, g).Set(ctxWith(bID, g), 1)
+ tests := []struct {
+ existing, new metricType
+ }{
+ {existing: counterType, new: counterType},
+ {existing: counterType, new: distributionType},
+ {existing: counterType, new: gaugeType},
+ {existing: distributionType, new: counterType},
+ {existing: distributionType, new: distributionType},
+ {existing: distributionType, new: gaugeType},
+ {existing: gaugeType, new: counterType},
+ {existing: gaugeType, new: distributionType},
+ {existing: gaugeType, new: gaugeType},
+ }
+ for _, test := range tests {
+ t.Run(fmt.Sprintf("%s name collides with %s", test.existing,
test.new),
+ func(t *testing.T) {
+ defer func() {
+ if test.existing != test.new {
+ if e := recover(); e != nil {
+ t.Logf("panic caught
re-using a name between a %s, and a %s", test.existing, test.new)
+ return
+ }
+ t.Error("panic expected")
+ } else {
+ t.Log("reusing names is fine
when the metrics the same type:", test.existing, test.new)
+ }
+ }()
+ var name string
+ switch test.existing {
+ case counterType:
+ name = c
+ case distributionType:
+ name = d
+ case gaugeType:
+ name = g
+ default:
+ t.Fatalf("unknown existing metricType
with value: %v", int(test.existing))
+ }
+ switch test.new {
+ case counterType:
+ NewCounter(ns, name).Inc(ctxWith(bID,
name), 1)
+ case distributionType:
+ NewDistribution(ns,
name).Update(ctxWith(bID, name), 1)
+ case gaugeType:
+ NewGauge(ns, name).Set(ctxWith(bID,
name), 1)
+ default:
+ t.Fatalf("unknown new metricType with
value: %v", int(test.new))
+ }
+
+ })
+ }
+}
+
+func TestClearBundleData(t *testing.T) {
+ Clear()
+ dump := func(t *testing.T) {
+ dumpTo(func(format string, args ...interface{}) {
+ t.Logf(format, args...)
+ })
+ }
+ pt, c, d, g := "clear.bundle.data", "counter", "distribution", "gauge"
+ aBundleID := "aBID"
+ otherBundleID := "otherBID"
+ NewCounter(pt, c).Inc(ctxWith(aBundleID, pt), 1)
+ NewDistribution(pt, d).Update(ctxWith(aBundleID, pt), 1)
+ NewGauge(pt, g).Set(ctxWith(aBundleID, pt), 1)
+
+ NewCounter(pt, c).Inc(ctxWith(otherBundleID, pt), 1)
+ NewDistribution(pt, d).Update(ctxWith(otherBundleID, pt), 1)
+ NewGauge(pt, g).Set(ctxWith(otherBundleID, pt), 1)
+
+ initialAP := ToProto(aBundleID, pt)
+ if got, want := len(initialAP), 3; got != want {
+ dump(t)
+ t.Fatalf("len(ToProto(%q, %q)) = %v, want %v - initialAP: %v",
aBundleID, pt, got, want, initialAP)
+ }
+ initialOP := ToProto(otherBundleID, pt)
+ if got, want := len(initialOP), 3; got != want {
+ dump(t)
+ t.Fatalf("len(ToProto(%q, %q)) = %v, want %v - initialOP: %v",
otherBundleID, pt, got, want, initialOP)
+ }
+
+ ClearBundleData(aBundleID)
+
+ newAP := ToProto(aBundleID, pt)
+ if got, want := len(newAP), 0; got != want {
+ dump(t)
+ t.Fatalf("len(ToProto(%q, %q)) = %v, want %v - newAP: %v",
aBundleID, pt, got, want, newAP)
+ }
+
+ newOP := ToProto(otherBundleID, pt)
+ if got, want := len(newOP), 3; got != want {
+ dump(t)
+ t.Fatalf("len(ToProto(%q, %q)) = %v, want %v - newOP: %v",
otherBundleID, pt, got, want, newOP)
+ }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index f0d0a561357..a2f7d7ba4b2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -23,6 +23,7 @@ import (
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
)
@@ -35,6 +36,7 @@ type ParDo struct {
Side []ReStream
Out []Node
+ PID string
ready bool
sideinput []ReusableInput
emitters []ReusableEmitter
@@ -92,6 +94,8 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm
FullValue, values ...ReS
return fmt.Errorf("invalid status for pardo %v: %v, want
Active", n.UID, n.status)
}
+ ctx = metrics.SetPTransformID(ctx, n.PID)
+
val, err := n.invokeDataFn(ctx, elm.Timestamp, n.Fn.ProcessElementFn(),
&MainInput{Key: elm, Values: values})
if err != nil {
return n.fail(err)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go
b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index 54dc812c5fe..f9168212aa1 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -21,15 +21,19 @@ import (
"context"
"fmt"
"strings"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+ fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
)
// Plan represents the bundle execution plan. It will generally be constructed
// from a part of a pipeline. A plan can be used to process multiple bundles
// serially.
type Plan struct {
- id string
- roots []Root
- units []Unit
+ id string
+ roots []Root
+ units []Unit
+ parDoIds []string
status Status
@@ -41,6 +45,7 @@ type Plan struct {
func NewPlan(id string, units []Unit) (*Plan, error) {
var roots []Root
var source *DataSource
+ var pardoIDs []string
for _, u := range units {
if u == nil {
@@ -52,17 +57,21 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
if s, ok := u.(*DataSource); ok {
source = s
}
+ if p, ok := u.(*ParDo); ok {
+ pardoIDs = append(pardoIDs, p.PID)
+ }
}
if len(roots) == 0 {
return nil, fmt.Errorf("no root units")
}
return &Plan{
- id: id,
- status: Initializing,
- roots: roots,
- units: units,
- source: source,
+ id: id,
+ status: Initializing,
+ roots: roots,
+ units: units,
+ parDoIds: pardoIDs,
+ source: source,
}, nil
}
@@ -75,6 +84,7 @@ func (p *Plan) ID() string {
// are brought up on the first execution. If a bundle fails, the plan cannot
// be reused for further bundles. Does not panic. Blocking.
func (p *Plan) Execute(ctx context.Context, id string, manager DataManager)
error {
+ ctx = metrics.SetBundleID(ctx, p.id)
if p.status == Initializing {
for _, u := range p.units {
if err := callNoPanic(ctx, u.Up); err != nil {
@@ -147,7 +157,28 @@ func (p *Plan) String() string {
return fmt.Sprintf("Plan[%v]:\n%v", p.ID(), strings.Join(units, "\n"))
}
-// ProgressReport returns a snapshot of input progress of the plan.
-func (p *Plan) ProgressReport() ProgressReportSnapshot {
- return p.source.Progress()
+// Metrics returns a snapshot of input progress of the plan, and associated
metrics.
+func (p *Plan) Metrics() *fnpb.Metrics {
+ snapshot := p.source.Progress()
+
+ transforms := map[string]*fnpb.Metrics_PTransform{
+ snapshot.ID: &fnpb.Metrics_PTransform{
+ ProcessedElements:
&fnpb.Metrics_PTransform_ProcessedElements{
+ Measured: &fnpb.Metrics_PTransform_Measured{
+ OutputElementCounts: map[string]int64{
+ snapshot.Name: snapshot.Count,
+ },
+ },
+ },
+ },
+ }
+
+ for _, pt := range p.parDoIds {
+ transforms[pt] = &fnpb.Metrics_PTransform{
+ User: metrics.ToProto(p.id, pt),
+ }
+ }
+ return &fnpb.Metrics{
+ Ptransforms: transforms,
+ }
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 291ef3a0d1d..946f216fd5b 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -17,6 +17,7 @@ package exec
import (
"fmt"
+ "path"
"strconv"
"strings"
@@ -295,11 +296,13 @@ func (b *builder) makeLink(from string, id linkID) (Node,
error) {
switch op {
case graph.ParDo:
- n := &ParDo{UID: b.idgen.New(), Inbound: in,
Out: out}
+ n := &ParDo{UID: b.idgen.New(), PID: id.to,
Inbound: in, Out: out}
n.Fn, err = graph.AsDoFn(fn)
if err != nil {
return nil, err
}
+ // TODO(lostluck): 2018/03/22 Look into why
transform.UniqueName isn't populated at this point, and switch n.PID to that
instead.
+ n.PID = path.Base(n.Fn.Name())
if len(in) == 1 {
u = n
break
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 57a8854591d..5e545c550f9 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -154,7 +154,7 @@ type control struct {
func (c *control) handleInstruction(ctx context.Context, req
*fnpb.InstructionRequest) *fnpb.InstructionResponse {
id := req.GetInstructionId()
- ctx = context.WithValue(ctx, instKey, id)
+ ctx = setInstID(ctx, id)
switch {
case req.GetRegister() != nil:
@@ -202,7 +202,7 @@ func (c *control) handleInstruction(ctx context.Context,
req *fnpb.InstructionRe
}
err := plan.Execute(ctx, id, c.data)
-
+ m := plan.Metrics()
// Move the plan back to the candidate state
c.mu.Lock()
c.plans[plan.ID()] = plan
@@ -216,7 +216,9 @@ func (c *control) handleInstruction(ctx context.Context,
req *fnpb.InstructionRe
return &fnpb.InstructionResponse{
InstructionId: id,
Response: &fnpb.InstructionResponse_ProcessBundle{
- ProcessBundle: &fnpb.ProcessBundleResponse{},
+ ProcessBundle: &fnpb.ProcessBundleResponse{
+ Metrics: m,
+ },
},
}
@@ -233,25 +235,13 @@ func (c *control) handleInstruction(ctx context.Context,
req *fnpb.InstructionRe
return fail(id, "execution plan for %v not found", ref)
}
- snapshot := plan.ProgressReport()
+ m := plan.Metrics()
return &fnpb.InstructionResponse{
InstructionId: id,
Response:
&fnpb.InstructionResponse_ProcessBundleProgress{
ProcessBundleProgress:
&fnpb.ProcessBundleProgressResponse{
- Metrics: &fnpb.Metrics{
- Ptransforms:
map[string]*fnpb.Metrics_PTransform{
- snapshot.ID:
&fnpb.Metrics_PTransform{
-
ProcessedElements: &fnpb.Metrics_PTransform_ProcessedElements{
-
Measured: &fnpb.Metrics_PTransform_Measured{
-
OutputElementCounts: map[string]int64{
-
snapshot.Name: snapshot.Count,
-
},
- },
- },
- },
- },
- },
+ Metrics: m,
},
},
}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/logging.go
b/sdks/go/pkg/beam/core/runtime/harness/logging.go
index 68e225e246d..f7608cdffde 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/logging.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/logging.go
@@ -32,8 +32,9 @@ import (
// TODO(herohde) 10/13/2017: add top-level harness.Main panic handler that
flushes logs.
// Also make logger flush on Fatal severity messages.
+type contextKey string
-const instKey = "beam:inst"
+const instKey contextKey = "beam:inst"
func setInstID(ctx context.Context, id string) context.Context {
return context.WithValue(ctx, instKey, id)
diff --git a/sdks/go/pkg/beam/metrics.go b/sdks/go/pkg/beam/metrics.go
new file mode 100644
index 00000000000..5e5bef6ecba
--- /dev/null
+++ b/sdks/go/pkg/beam/metrics.go
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package beam
+
+import (
+ "context"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+)
+
+// Counter is a metric that can be incremented and decremented,
+// and is aggregated by the sum.
+type Counter struct {
+ metrics.Counter
+}
+
+// Inc increments the counter within by the given amount.
+func (c Counter) Inc(ctx context.Context, v int64) {
+ c.Counter.Inc(ctx, v)
+}
+
+// Dec decrements the counter within by the given amount.
+func (c Counter) Dec(ctx context.Context, v int64) {
+ c.Counter.Dec(ctx, v)
+}
+
+// NewCounter returns the Counter with the given namespace and name.
+func NewCounter(namespace, name string) Counter {
+ return Counter{metrics.NewCounter(namespace, name)}
+}
+
+// Distribution is a metric that records various statistics about the
distribution
+// of reported values.
+type Distribution struct {
+ metrics.Distribution
+}
+
+// Update adds an observation to this distribution.
+func (c Distribution) Update(ctx context.Context, v int64) {
+ c.Distribution.Update(ctx, v)
+}
+
+// NewDistribution returns the Distribution with the given namespace and name.
+func NewDistribution(namespace, name string) Distribution {
+ return Distribution{metrics.NewDistribution(namespace, name)}
+}
+
+// Gauge is a metric that can have its new value set, and is aggregated by
taking
+// the last reported value.
+type Gauge struct {
+ metrics.Gauge
+}
+
+// Set sets the current value for this gauge.
+func (c Gauge) Set(ctx context.Context, v int64) {
+ c.Gauge.Set(ctx, v)
+}
+
+// NewGauge returns the Gauge with the given namespace and name.
+func NewGauge(namespace, name string) Gauge {
+ return Gauge{metrics.NewGauge(namespace, name)}
+}
diff --git a/sdks/go/pkg/beam/metrics_test.go b/sdks/go/pkg/beam/metrics_test.go
new file mode 100644
index 00000000000..11b44101671
--- /dev/null
+++ b/sdks/go/pkg/beam/metrics_test.go
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package beam_test
+
+import (
+ "context"
+ "regexp"
+ "time"
+
+ "github.com/apache/beam/sdks/go/pkg/beam"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
+)
+
+// A beam_test global context var to improve how the examples look.
+var ctx = context.Background()
+
+func ctxWithPtransformID(id string) context.Context {
+ ctx := context.Background()
+ ctx = metrics.SetPTransformID(ctx, id)
+ ctx = metrics.SetBundleID(ctx, "exampleBundle")
+ return ctx
+}
+
+func dumpAndClearMetrics() {
+ metrics.DumpToOut()
+ metrics.Clear()
+}
+
+var (
+ wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
+)
+
+func Example_metricsDeclaredAnywhere() {
+
+ // Metrics can be declared outside DoFns, and used inside..
+ outside := beam.NewCounter("example.namespace", "count")
+
+ extractWordsDofn := func(ctx context.Context, line string, emit
func(string)) {
+ // They can be defined at time of use within a DoFn, if
necessary.
+ inside := beam.NewDistribution("example.namespace",
"characters")
+ for _, word := range wordRE.FindAllString(line, -1) {
+ emit(word)
+ outside.Inc(ctx, 1)
+ inside.Update(ctx, int64(len(word)))
+ }
+ }
+ ctx := ctxWithPtransformID("example")
+ extractWordsDofn(ctx, "this has six words in it", func(string) {})
+ extractWordsDofn(ctx, "this has seven words in it, see?", func(string)
{})
+
+ dumpAndClearMetrics()
+ // Output: Bundle: "exampleBundle" - PTransformID: "example"
+ // example.namespace.characters - count: 13 sum: 43 min: 2 max: 5
+ // example.namespace.count - value: 13
+}
+
+func Example_metricsReusable() {
+
+ // Metrics can be used in multiple DoFns
+ c := beam.NewCounter("example.reusable", "count")
+
+ extractWordsDofn := func(ctx context.Context, line string, emit
func(string)) {
+ for _, word := range wordRE.FindAllString(line, -1) {
+ emit(word)
+ c.Inc(ctx, 1)
+ }
+ }
+
+ extractRunesDofn := func(ctx context.Context, line string, emit
func(rune)) {
+ for _, r := range line {
+ emit(r)
+ c.Inc(ctx, 1)
+ }
+ }
+ extractWordsDofn(ctxWithPtransformID("extract1"), "this has six words
in it", func(string) {})
+
+ extractRunesDofn(ctxWithPtransformID("extract2"), "seven thousand",
func(rune) {})
+
+ dumpAndClearMetrics()
+ // Output: Bundle: "exampleBundle" - PTransformID: "extract1"
+ // example.reusable.count - value: 6
+ // Bundle: "exampleBundle" - PTransformID: "extract2"
+ // example.reusable.count - value: 14
+}
+
+func ExampleCounter_Inc() {
+ c := beam.NewCounter("example", "size")
+ c.Inc(ctx, int64(len("foobar")))
+}
+
+func ExampleCounter_Dec() {
+ c := beam.NewCounter("example", "size")
+ c.Dec(ctx, int64(len("foobar")))
+}
+
+func ExampleDistribution_Update() {
+ t := time.Millisecond * 42
+ d := beam.NewDistribution("example", "latency_micros")
+ d.Update(ctx, int64(t/time.Microsecond))
+}
+
+func ExampleGauge_Set() {
+ g := beam.NewGauge("example", "progress")
+ g.Set(ctx, 42)
+}
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go
b/sdks/go/pkg/beam/runners/direct/direct.go
index 78945daa0cc..1eb7c654669 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -18,9 +18,11 @@ package direct
import (
"context"
"fmt"
+ "path"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+ "github.com/apache/beam/sdks/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/go/pkg/beam/log"
@@ -48,7 +50,11 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
plan.Down(ctx) // ignore any teardown errors
return err
}
- return plan.Down(ctx)
+ if err = plan.Down(ctx); err != nil {
+ return err
+ }
+ metrics.DumpToLog(ctx)
+ return nil
}
// Compile translates a pipeline to a multi-bundle execution plan.
@@ -206,6 +212,7 @@ func (b *builder) makeLink(id linkID) (exec.Node, error) {
switch edge.Op {
case graph.ParDo:
pardo := &exec.ParDo{UID: b.idgen.New(), Fn: edge.DoFn,
Inbound: edge.Input, Out: out}
+ pardo.PID = path.Base(pardo.Fn.Name())
if len(edge.Input) == 1 {
u = pardo
break
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 85089)
Time Spent: 9.5h (was: 9h 20m)
> Fn API metrics in Go SDK harness
> --------------------------------
>
> Key: BEAM-3545
> URL: https://issues.apache.org/jira/browse/BEAM-3545
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Kenneth Knowles
> Assignee: Robert Burke
> Priority: Major
> Labels: portability
> Time Spent: 9.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)